diff options
Diffstat (limited to 'src/listener.cpp')
-rw-r--r-- | src/listener.cpp | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/src/listener.cpp b/src/listener.cpp new file mode 100644 index 0000000..ae4a80f --- /dev/null +++ b/src/listener.cpp @@ -0,0 +1,170 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "listener.hpp" +#include "simple_semaphore.hpp" +#include "zmq_tcp_engine.hpp" +#include "io_thread.hpp" +#include "session_stub.hpp" +#include "session.hpp" +#include "err.hpp" +#include "dummy_aggregator.hpp" +#include "dummy_distributor.hpp" + +zs::listener_t::listener_t (io_thread_t *thread_, const char *addr_, + session_t *peer_, bool has_in_, bool has_out_, uint64_t taskset_) : + io_object_t (thread_), + poller (NULL), + addr (addr_), + peer (peer_), + taskset (taskset_), + has_in (has_in_), + has_out (has_out_) +{ +} + +void zs::listener_t::terminate () +{ + for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++) + session_stubs [i]->terminate (); + delete this; +} + +void zs::listener_t::shutdown () +{ + for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++) + session_stubs [i]->shutdown (); + delete this; +} + +zs::listener_t::~listener_t () +{ +} + +void zs::listener_t::got_identity (session_stub_t *session_stub_, + const char *identity_) +{ + // Get the engine allready disconnected from the stub and poller. + i_engine *engine = session_stub_->detach_engine (); + zs_assert (engine); + + // Find the corresponding session. + session_t *session; + sessions_t::iterator it = sessions.find (identity_); + + // Destroy the stub. + int i = session_stub_->get_index (); + session_stubs [i] = session_stubs [session_stubs.size () - 1]; + session_stubs [i]->set_index (i); + session_stubs.pop_back (); + session_stub_->terminate (); + + // If there's no session with the specified identity, create one. + if (it != sessions.end ()) { + session = it->second; + session->inc_seqnum (); + } + else { + + // Choose an I/O thread with the least load to handle the new session. + io_thread_t *io_thread = choose_io_thread (taskset); + + // Create the session and bind it to the I/O thread and peer. Make + // sure that the peer session won't get deallocated till it processes + // the subsequent bind command. + i_mux *mux = new dummy_aggregator_t; + zs_assert (mux); + i_demux *demux = new dummy_distributor_t; + zs_assert (demux); + session = new session_t (io_thread, io_thread, mux, demux, false, true); + zs_assert (session); + session->inc_seqnum (); + session->inc_seqnum (); + peer->inc_seqnum (); + send_reg_and_bind (session, peer, has_in, has_out); + } + + // Attach the engine to the session. + send_engine (session, engine); +} + +void zs::listener_t::process_reg (simple_semaphore_t *smph_) +{ + zs_assert (!poller); + poller = get_poller (); + + // Open the listening socket. + int rc = tcp_listener.open (addr.c_str ()); + zs_assert (rc == 0); + + // Unlock the application thread that created the listener. + if (smph_) + smph_->post (); + + // Start polling for incoming connections. + handle = poller->add_fd (tcp_listener.get_fd (), this); + poller->set_pollin (handle); +} + +void zs::listener_t::process_unreg (simple_semaphore_t *smph_) +{ + // Disassociate listener from the poller. + zs_assert (poller); + poller->rm_fd (handle); + poller = NULL; + + // Unlock the application thread closing the listener. + if (smph_) + smph_->post (); +} + +void zs::listener_t::in_event () +{ + fd_t fd = tcp_listener.accept (); + + // If connection was reset by the peer in the meantime, just ignore it. + // TODO: Handle specific errors like ENFILE/EMFILE etc. + if (fd == retired_fd) + return; + + // Create an session stub for the engine to take care for it till its + // identity is retreived. + session_stub_t *session_stub = new session_stub_t (this); + zs_assert (session_stub); + session_stub->set_index (session_stubs.size ()); + session_stubs.push_back (session_stub); + + // Create an engine to encaspulate the socket. Engine will register itself + // with the stub so the stub will be able to free it in case of shutdown. + zmq_tcp_engine_t *engine = new zmq_tcp_engine_t (fd); + zs_assert (engine); + engine->attach (poller, session_stub); +} + +void zs::listener_t::out_event () +{ + zs_assert (false); +} + +void zs::listener_t::timer_event () +{ + zs_assert (false); +} + + |