summaryrefslogtreecommitdiff
path: root/src/app_thread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/app_thread.cpp')
-rw-r--r--src/app_thread.cpp106
1 files changed, 8 insertions, 98 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 9cc61c7..23a055a 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -28,20 +28,8 @@
#include "app_thread.hpp"
#include "context.hpp"
#include "err.hpp"
-#include "session.hpp"
#include "pipe.hpp"
#include "config.hpp"
-#include "i_api.hpp"
-#include "dummy_aggregator.hpp"
-#include "fair_aggregator.hpp"
-#include "dummy_distributor.hpp"
-#include "data_distributor.hpp"
-#include "load_balancer.hpp"
-#include "p2p.hpp"
-#include "pub.hpp"
-#include "sub.hpp"
-#include "req.hpp"
-#include "rep.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -58,37 +46,15 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
{
}
-void zmq::app_thread_t::shutdown ()
-{
- // Deallocate all the sessions associated with the thread.
- while (!sessions.empty ())
- sessions [0]->shutdown ();
-
- delete this;
-}
-
zmq::app_thread_t::~app_thread_t ()
{
-}
+ // Ask all the sockets to start termination, then wait till it is complete.
+ for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
+ (*it)->stop ();
+ for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
+ delete *it;
-void zmq::app_thread_t::attach_session (session_t *session_)
-{
- session_->set_index (sessions.size ());
- sessions.push_back (session_);
-}
-
-void zmq::app_thread_t::detach_session (session_t *session_)
-{
- // O(1) removal of the session from the list.
- sessions_t::size_type i = session_->get_index ();
- sessions [i] = sessions [sessions.size () - 1];
- sessions [i]->set_index (i);
- sessions.pop_back ();
-}
-
-zmq::i_poller *zmq::app_thread_t::get_poller ()
-{
- zmq_assert (false);
+ delete this;
}
zmq::i_signaler *zmq::app_thread_t::get_signaler ()
@@ -98,76 +64,20 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler ()
bool zmq::app_thread_t::is_current ()
{
- return !sessions.empty () && tid == getpid ();
+ return !sockets.empty () && tid == getpid ();
}
bool zmq::app_thread_t::make_current ()
{
// If there are object managed by this slot we cannot assign the slot
// to a different thread.
- if (!sessions.empty ())
+ if (!sockets.empty ())
return false;
tid = getpid ();
return true;
}
-zmq::i_api *zmq::app_thread_t::create_socket (int type_)
-{
- i_mux *mux = NULL;
- i_demux *demux = NULL;
- session_t *session = NULL;
- i_api *api = NULL;
-
- switch (type_) {
- case ZMQ_P2P:
- mux = new dummy_aggregator_t;
- zmq_assert (mux);
- demux = new dummy_distributor_t;
- zmq_assert (demux);
- session = new session_t (this, this, mux, demux, true, false);
- zmq_assert (session);
- api = new p2p_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_PUB:
- demux = new data_distributor_t;
- zmq_assert (demux);
- session = new session_t (this, this, mux, demux, true, false);
- zmq_assert (session);
- api = new pub_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_SUB:
- mux = new fair_aggregator_t;
- zmq_assert (mux);
- session = new session_t (this, this, mux, demux, true, false);
- zmq_assert (session);
- api = new sub_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_REQ:
- // TODO
- zmq_assert (false);
- api = new req_t (this, session);
- zmq_assert (api);
- break;
- case ZMQ_REP:
- // TODO
- zmq_assert (false);
- api = new rep_t (this, session);
- zmq_assert (api);
- break;
- default:
- errno = EINVAL;
- return NULL;
- }
-
- attach_session (session);
-
- return api;
-}
-
void zmq::app_thread_t::process_commands (bool block_)
{
ypollset_t::signals_t signals;