From 0b5cc026fbe7ccc6de66907be29471562a2d344d Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 6 Aug 2009 12:51:32 +0200 Subject: clean up - session/socket/engine stuff removed --- src/listener.cpp | 170 ------------------------------------------------------- 1 file changed, 170 deletions(-) delete mode 100644 src/listener.cpp (limited to 'src/listener.cpp') diff --git a/src/listener.cpp b/src/listener.cpp deleted file mode 100644 index 823b21b..0000000 --- a/src/listener.cpp +++ /dev/null @@ -1,170 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "listener.hpp" -#include "simple_semaphore.hpp" -#include "zmq_tcp_engine.hpp" -#include "io_thread.hpp" -#include "session_stub.hpp" -#include "session.hpp" -#include "err.hpp" -#include "dummy_aggregator.hpp" -#include "dummy_distributor.hpp" - -zmq::listener_t::listener_t (io_thread_t *thread_, const char *addr_, - session_t *peer_, bool has_in_, bool has_out_, uint64_t taskset_) : - io_object_t (thread_), - poller (NULL), - addr (addr_), - peer (peer_), - taskset (taskset_), - has_in (has_in_), - has_out (has_out_) -{ -} - -void zmq::listener_t::terminate () -{ - for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++) - session_stubs [i]->terminate (); - delete this; -} - -void zmq::listener_t::shutdown () -{ - for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++) - session_stubs [i]->shutdown (); - delete this; -} - -zmq::listener_t::~listener_t () -{ -} - -void zmq::listener_t::got_identity (session_stub_t *session_stub_, - const char *identity_) -{ - // Get the engine allready disconnected from the stub and poller. - i_engine *engine = session_stub_->detach_engine (); - zmq_assert (engine); - - // Find the corresponding session. - session_t *session; - sessions_t::iterator it = sessions.find (identity_); - - // Destroy the stub. - int i = session_stub_->get_index (); - session_stubs [i] = session_stubs [session_stubs.size () - 1]; - session_stubs [i]->set_index (i); - session_stubs.pop_back (); - session_stub_->terminate (); - - // If there's no session with the specified identity, create one. - if (it != sessions.end ()) { - session = it->second; - session->inc_seqnum (); - } - else { - - // Choose an I/O thread with the least load to handle the new session. - io_thread_t *io_thread = choose_io_thread (taskset); - - // Create the session and bind it to the I/O thread and peer. Make - // sure that the peer session won't get deallocated till it processes - // the subsequent bind command. - i_mux *mux = new dummy_aggregator_t; - zmq_assert (mux); - i_demux *demux = new dummy_distributor_t; - zmq_assert (demux); - session = new session_t (io_thread, io_thread, mux, demux, false, true); - zmq_assert (session); - session->inc_seqnum (); - session->inc_seqnum (); - peer->inc_seqnum (); - send_reg_and_bind (session, peer, has_in, has_out); - } - - // Attach the engine to the session. - send_engine (session, engine); -} - -void zmq::listener_t::process_reg (simple_semaphore_t *smph_) -{ - zmq_assert (!poller); - poller = get_poller (); - - // Open the listening socket. - int rc = tcp_listener.open (addr.c_str ()); - zmq_assert (rc == 0); - - // Unlock the application thread that created the listener. - if (smph_) - smph_->post (); - - // Start polling for incoming connections. - handle = poller->add_fd (tcp_listener.get_fd (), this); - poller->set_pollin (handle); -} - -void zmq::listener_t::process_unreg (simple_semaphore_t *smph_) -{ - // Disassociate listener from the poller. - zmq_assert (poller); - poller->rm_fd (handle); - poller = NULL; - - // Unlock the application thread closing the listener. - if (smph_) - smph_->post (); -} - -void zmq::listener_t::in_event () -{ - fd_t fd = tcp_listener.accept (); - - // If connection was reset by the peer in the meantime, just ignore it. - // TODO: Handle specific errors like ENFILE/EMFILE etc. - if (fd == retired_fd) - return; - - // Create an session stub for the engine to take care for it till its - // identity is retreived. - session_stub_t *session_stub = new session_stub_t (this); - zmq_assert (session_stub); - session_stub->set_index (session_stubs.size ()); - session_stubs.push_back (session_stub); - - // Create an engine to encaspulate the socket. Engine will register itself - // with the stub so the stub will be able to free it in case of shutdown. - zmq_tcp_engine_t *engine = new zmq_tcp_engine_t (fd); - zmq_assert (engine); - engine->attach (poller, session_stub); -} - -void zmq::listener_t::out_event () -{ - zmq_assert (false); -} - -void zmq::listener_t::timer_event () -{ - zmq_assert (false); -} - - -- cgit v1.2.3