From 0b5cc026fbe7ccc6de66907be29471562a2d344d Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 6 Aug 2009 12:51:32 +0200 Subject: clean up - session/socket/engine stuff removed --- src/app_thread.cpp | 106 ++++------------------------------------------------- 1 file changed, 8 insertions(+), 98 deletions(-) (limited to 'src/app_thread.cpp') diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 9cc61c7..23a055a 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -28,20 +28,8 @@ #include "app_thread.hpp" #include "context.hpp" #include "err.hpp" -#include "session.hpp" #include "pipe.hpp" #include "config.hpp" -#include "i_api.hpp" -#include "dummy_aggregator.hpp" -#include "fair_aggregator.hpp" -#include "dummy_distributor.hpp" -#include "data_distributor.hpp" -#include "load_balancer.hpp" -#include "p2p.hpp" -#include "pub.hpp" -#include "sub.hpp" -#include "req.hpp" -#include "rep.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -58,37 +46,15 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : { } -void zmq::app_thread_t::shutdown () -{ - // Deallocate all the sessions associated with the thread. - while (!sessions.empty ()) - sessions [0]->shutdown (); - - delete this; -} - zmq::app_thread_t::~app_thread_t () { -} + // Ask all the sockets to start termination, then wait till it is complete. + for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) + (*it)->stop (); + for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) + delete *it; -void zmq::app_thread_t::attach_session (session_t *session_) -{ - session_->set_index (sessions.size ()); - sessions.push_back (session_); -} - -void zmq::app_thread_t::detach_session (session_t *session_) -{ - // O(1) removal of the session from the list. - sessions_t::size_type i = session_->get_index (); - sessions [i] = sessions [sessions.size () - 1]; - sessions [i]->set_index (i); - sessions.pop_back (); -} - -zmq::i_poller *zmq::app_thread_t::get_poller () -{ - zmq_assert (false); + delete this; } zmq::i_signaler *zmq::app_thread_t::get_signaler () @@ -98,76 +64,20 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler () bool zmq::app_thread_t::is_current () { - return !sessions.empty () && tid == getpid (); + return !sockets.empty () && tid == getpid (); } bool zmq::app_thread_t::make_current () { // If there are object managed by this slot we cannot assign the slot // to a different thread. - if (!sessions.empty ()) + if (!sockets.empty ()) return false; tid = getpid (); return true; } -zmq::i_api *zmq::app_thread_t::create_socket (int type_) -{ - i_mux *mux = NULL; - i_demux *demux = NULL; - session_t *session = NULL; - i_api *api = NULL; - - switch (type_) { - case ZMQ_P2P: - mux = new dummy_aggregator_t; - zmq_assert (mux); - demux = new dummy_distributor_t; - zmq_assert (demux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new p2p_t (this, session); - zmq_assert (api); - break; - case ZMQ_PUB: - demux = new data_distributor_t; - zmq_assert (demux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new pub_t (this, session); - zmq_assert (api); - break; - case ZMQ_SUB: - mux = new fair_aggregator_t; - zmq_assert (mux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new sub_t (this, session); - zmq_assert (api); - break; - case ZMQ_REQ: - // TODO - zmq_assert (false); - api = new req_t (this, session); - zmq_assert (api); - break; - case ZMQ_REP: - // TODO - zmq_assert (false); - api = new rep_t (this, session); - zmq_assert (api); - break; - default: - errno = EINVAL; - return NULL; - } - - attach_session (session); - - return api; -} - void zmq::app_thread_t::process_commands (bool block_) { ypollset_t::signals_t signals; -- cgit v1.2.3