diff options
Diffstat (limited to 'src/dispatcher.cpp')
-rw-r--r-- | src/dispatcher.cpp | 71 |
1 files changed, 28 insertions, 43 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index b1ba11f..db06d32 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -18,6 +18,7 @@ */ #include <new> +#include <string.h> #include "../include/zmq.h" @@ -48,35 +49,30 @@ zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) : HIBYTE (wsa_data.wVersion) == 2); #endif - // Create application thread proxies. - for (uint32_t i = 0; i != app_threads_; i++) { - app_thread_info_t info; - info.associated = false; - info.app_thread = new (std::nothrow) app_thread_t (this, i); - zmq_assert (info.app_thread); - app_threads.push_back (info); - signalers.push_back (info.app_thread->get_signaler ()); - } + // Initialise the array of signalers. + signalers_count = app_threads_ + io_threads_; + signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count); + zmq_assert (signalers); + memset (signalers, 0, sizeof (signaler_t*) * signalers_count); // Create I/O thread objects. for (uint32_t i = 0; i != io_threads_; i++) { - io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, - i + app_threads_); + io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); zmq_assert (io_thread); io_threads.push_back (io_thread); - signalers.push_back (io_thread->get_signaler ()); + signalers [i] = io_thread->get_signaler (); } - // Create the administrative thread. Nothing special is needed. NULL - // is used instead of signaler given that as for now, administrative - // thread doesn't receive any commands. The only thing it is used for - // is sending 'stop' command to I/O threads on shutdown. - signalers.push_back (NULL); - - // Create command pipe matrix. - command_pipes = new (std::nothrow) command_pipe_t [signalers.size () * - signalers.size ()]; - zmq_assert (command_pipes); + // Create application thread proxies. + for (uint32_t i = 0; i != app_threads_; i++) { + app_thread_info_t info; + info.associated = false; + info.app_thread = new (std::nothrow) app_thread_t (this, + i + io_threads_); + zmq_assert (info.app_thread); + app_threads.push_back (info); + signalers [i + io_threads_] = info.app_thread->get_signaler (); + } // Launch I/O threads. for (uint32_t i = 0; i != io_threads_; i++) @@ -123,12 +119,11 @@ zmq::dispatcher_t::~dispatcher_t () while (!pipes.empty ()) delete *pipes.begin (); - // TODO: Deallocate any commands still in the pipes. Keep in mind that - // simple reading from a pipe and deallocating commands won't do as - // command pipe has template parameter D set to true, meaning that - // read may return false even if there are still commands in the pipe. - delete [] command_pipes; - + // Deallocate the array of pointers to signalers. No special work is + // needed as signalers themselves were deallocated with their + // corresponding (app_/io_) thread objects. + free (signalers); + #ifdef ZMQ_HAVE_WINDOWS // On Windows, uninitialise socket layer. int rc = WSACleanup (); @@ -136,11 +131,6 @@ zmq::dispatcher_t::~dispatcher_t () #endif } -uint32_t zmq::dispatcher_t::thread_slot_count () -{ - return (uint32_t) signalers.size (); -} - zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) { app_threads_sync.lock (); @@ -213,21 +203,16 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_) app_threads_sync.unlock (); } -void zmq::dispatcher_t::write (uint32_t source_, uint32_t destination_, +void zmq::dispatcher_t::send_command (uint32_t destination_, const command_t &command_) { - command_pipe_t &pipe = - command_pipes [source_ * signalers.size () + destination_]; - pipe.write (command_); - if (!pipe.flush ()) - signalers [destination_]->signal (source_); + signalers [destination_]->send (command_); } -bool zmq::dispatcher_t::read (uint32_t source_, uint32_t destination_, - command_t *command_) +bool zmq::dispatcher_t::recv_command (uint32_t thread_slot_, + command_t *command_, bool block_) { - return command_pipes [source_ * signalers.size () + - destination_].read (command_); + return signalers [thread_slot_]->recv (command_, block_); } zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_) |