From a8b410e66c3c75809c8e9c01dd3e35c579f02347 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 8 Aug 2009 16:01:58 +0200 Subject: lockfree interaction patter for 3 theads implemented --- src/app_thread.cpp | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) (limited to 'src/app_thread.cpp') diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 23a055a..3f76970 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -17,6 +17,8 @@ along with this program. If not, see . */ +#include + #include "../include/zmq.h" #if defined ZMQ_HAVE_WINDOWS @@ -26,10 +28,12 @@ #endif #include "app_thread.hpp" -#include "context.hpp" +#include "i_api.hpp" +#include "dispatcher.hpp" #include "err.hpp" #include "pipe.hpp" #include "config.hpp" +#include "socket_base.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -39,8 +43,8 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : - object_t (context_, thread_slot_), +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_), tid (0), last_processing_time (0) { @@ -48,13 +52,9 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : zmq::app_thread_t::~app_thread_t () { - // Ask all the sockets to start termination, then wait till it is complete. - for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) - (*it)->stop (); + // Destroy all the sockets owned by this application thread. for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) delete *it; - - delete this; } zmq::i_signaler *zmq::app_thread_t::get_signaler () @@ -123,9 +123,28 @@ void zmq::app_thread_t::process_commands (bool block_) for (int i = 0; i != thread_slot_count (); i++) { if (signals & (ypollset_t::signals_t (1) << i)) { command_t cmd; - while (context->read (i, get_thread_slot (), &cmd)) + while (dispatcher->read (i, get_thread_slot (), &cmd)) cmd.destination->process_command (cmd); } } } } + +zmq::i_api *zmq::app_thread_t::create_socket (int type_) +{ + // TODO: type is ignored for the time being. + socket_base_t *s = new socket_base_t (this); + zmq_assert (s); + sockets.push_back (s); + return s; +} + +void zmq::app_thread_t::remove_socket (i_api *socket_) +{ + // TODO: To speed this up we can possibly use the system where each socket + // holds its index (see I/O scheduler implementation). + sockets_t::iterator it = std::find (sockets.begin (), sockets.end (), + socket_); + zmq_assert (it != sockets.end ()); + sockets.erase (it); +} -- cgit v1.2.3