summaryrefslogtreecommitdiff
path: root/src/listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/listener.cpp')
-rw-r--r--src/listener.cpp170
1 files changed, 170 insertions, 0 deletions
diff --git a/src/listener.cpp b/src/listener.cpp
new file mode 100644
index 0000000..ae4a80f
--- /dev/null
+++ b/src/listener.cpp
@@ -0,0 +1,170 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "listener.hpp"
+#include "simple_semaphore.hpp"
+#include "zmq_tcp_engine.hpp"
+#include "io_thread.hpp"
+#include "session_stub.hpp"
+#include "session.hpp"
+#include "err.hpp"
+#include "dummy_aggregator.hpp"
+#include "dummy_distributor.hpp"
+
+zs::listener_t::listener_t (io_thread_t *thread_, const char *addr_,
+ session_t *peer_, bool has_in_, bool has_out_, uint64_t taskset_) :
+ io_object_t (thread_),
+ poller (NULL),
+ addr (addr_),
+ peer (peer_),
+ taskset (taskset_),
+ has_in (has_in_),
+ has_out (has_out_)
+{
+}
+
+void zs::listener_t::terminate ()
+{
+ for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++)
+ session_stubs [i]->terminate ();
+ delete this;
+}
+
+void zs::listener_t::shutdown ()
+{
+ for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++)
+ session_stubs [i]->shutdown ();
+ delete this;
+}
+
+zs::listener_t::~listener_t ()
+{
+}
+
+void zs::listener_t::got_identity (session_stub_t *session_stub_,
+ const char *identity_)
+{
+ // Get the engine allready disconnected from the stub and poller.
+ i_engine *engine = session_stub_->detach_engine ();
+ zs_assert (engine);
+
+ // Find the corresponding session.
+ session_t *session;
+ sessions_t::iterator it = sessions.find (identity_);
+
+ // Destroy the stub.
+ int i = session_stub_->get_index ();
+ session_stubs [i] = session_stubs [session_stubs.size () - 1];
+ session_stubs [i]->set_index (i);
+ session_stubs.pop_back ();
+ session_stub_->terminate ();
+
+ // If there's no session with the specified identity, create one.
+ if (it != sessions.end ()) {
+ session = it->second;
+ session->inc_seqnum ();
+ }
+ else {
+
+ // Choose an I/O thread with the least load to handle the new session.
+ io_thread_t *io_thread = choose_io_thread (taskset);
+
+ // Create the session and bind it to the I/O thread and peer. Make
+ // sure that the peer session won't get deallocated till it processes
+ // the subsequent bind command.
+ i_mux *mux = new dummy_aggregator_t;
+ zs_assert (mux);
+ i_demux *demux = new dummy_distributor_t;
+ zs_assert (demux);
+ session = new session_t (io_thread, io_thread, mux, demux, false, true);
+ zs_assert (session);
+ session->inc_seqnum ();
+ session->inc_seqnum ();
+ peer->inc_seqnum ();
+ send_reg_and_bind (session, peer, has_in, has_out);
+ }
+
+ // Attach the engine to the session.
+ send_engine (session, engine);
+}
+
+void zs::listener_t::process_reg (simple_semaphore_t *smph_)
+{
+ zs_assert (!poller);
+ poller = get_poller ();
+
+ // Open the listening socket.
+ int rc = tcp_listener.open (addr.c_str ());
+ zs_assert (rc == 0);
+
+ // Unlock the application thread that created the listener.
+ if (smph_)
+ smph_->post ();
+
+ // Start polling for incoming connections.
+ handle = poller->add_fd (tcp_listener.get_fd (), this);
+ poller->set_pollin (handle);
+}
+
+void zs::listener_t::process_unreg (simple_semaphore_t *smph_)
+{
+ // Disassociate listener from the poller.
+ zs_assert (poller);
+ poller->rm_fd (handle);
+ poller = NULL;
+
+ // Unlock the application thread closing the listener.
+ if (smph_)
+ smph_->post ();
+}
+
+void zs::listener_t::in_event ()
+{
+ fd_t fd = tcp_listener.accept ();
+
+ // If connection was reset by the peer in the meantime, just ignore it.
+ // TODO: Handle specific errors like ENFILE/EMFILE etc.
+ if (fd == retired_fd)
+ return;
+
+ // Create an session stub for the engine to take care for it till its
+ // identity is retreived.
+ session_stub_t *session_stub = new session_stub_t (this);
+ zs_assert (session_stub);
+ session_stub->set_index (session_stubs.size ());
+ session_stubs.push_back (session_stub);
+
+ // Create an engine to encaspulate the socket. Engine will register itself
+ // with the stub so the stub will be able to free it in case of shutdown.
+ zmq_tcp_engine_t *engine = new zmq_tcp_engine_t (fd);
+ zs_assert (engine);
+ engine->attach (poller, session_stub);
+}
+
+void zs::listener_t::out_event ()
+{
+ zs_assert (false);
+}
+
+void zs::listener_t::timer_event ()
+{
+ zs_assert (false);
+}
+
+