summaryrefslogtreecommitdiff
path: root/src/dispatcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dispatcher.cpp')
-rw-r--r--src/dispatcher.cpp71
1 files changed, 28 insertions, 43 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index b1ba11f..db06d32 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -18,6 +18,7 @@
*/
#include <new>
+#include <string.h>
#include "../include/zmq.h"
@@ -48,35 +49,30 @@ zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
HIBYTE (wsa_data.wVersion) == 2);
#endif
- // Create application thread proxies.
- for (uint32_t i = 0; i != app_threads_; i++) {
- app_thread_info_t info;
- info.associated = false;
- info.app_thread = new (std::nothrow) app_thread_t (this, i);
- zmq_assert (info.app_thread);
- app_threads.push_back (info);
- signalers.push_back (info.app_thread->get_signaler ());
- }
+ // Initialise the array of signalers.
+ signalers_count = app_threads_ + io_threads_;
+ signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count);
+ zmq_assert (signalers);
+ memset (signalers, 0, sizeof (signaler_t*) * signalers_count);
// Create I/O thread objects.
for (uint32_t i = 0; i != io_threads_; i++) {
- io_thread_t *io_thread = new (std::nothrow) io_thread_t (this,
- i + app_threads_);
+ io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
- signalers.push_back (io_thread->get_signaler ());
+ signalers [i] = io_thread->get_signaler ();
}
- // Create the administrative thread. Nothing special is needed. NULL
- // is used instead of signaler given that as for now, administrative
- // thread doesn't receive any commands. The only thing it is used for
- // is sending 'stop' command to I/O threads on shutdown.
- signalers.push_back (NULL);
-
- // Create command pipe matrix.
- command_pipes = new (std::nothrow) command_pipe_t [signalers.size () *
- signalers.size ()];
- zmq_assert (command_pipes);
+ // Create application thread proxies.
+ for (uint32_t i = 0; i != app_threads_; i++) {
+ app_thread_info_t info;
+ info.associated = false;
+ info.app_thread = new (std::nothrow) app_thread_t (this,
+ i + io_threads_);
+ zmq_assert (info.app_thread);
+ app_threads.push_back (info);
+ signalers [i + io_threads_] = info.app_thread->get_signaler ();
+ }
// Launch I/O threads.
for (uint32_t i = 0; i != io_threads_; i++)
@@ -123,12 +119,11 @@ zmq::dispatcher_t::~dispatcher_t ()
while (!pipes.empty ())
delete *pipes.begin ();
- // TODO: Deallocate any commands still in the pipes. Keep in mind that
- // simple reading from a pipe and deallocating commands won't do as
- // command pipe has template parameter D set to true, meaning that
- // read may return false even if there are still commands in the pipe.
- delete [] command_pipes;
-
+ // Deallocate the array of pointers to signalers. No special work is
+ // needed as signalers themselves were deallocated with their
+ // corresponding (app_/io_) thread objects.
+ free (signalers);
+
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
int rc = WSACleanup ();
@@ -136,11 +131,6 @@ zmq::dispatcher_t::~dispatcher_t ()
#endif
}
-uint32_t zmq::dispatcher_t::thread_slot_count ()
-{
- return (uint32_t) signalers.size ();
-}
-
zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
{
app_threads_sync.lock ();
@@ -213,21 +203,16 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
app_threads_sync.unlock ();
}
-void zmq::dispatcher_t::write (uint32_t source_, uint32_t destination_,
+void zmq::dispatcher_t::send_command (uint32_t destination_,
const command_t &command_)
{
- command_pipe_t &pipe =
- command_pipes [source_ * signalers.size () + destination_];
- pipe.write (command_);
- if (!pipe.flush ())
- signalers [destination_]->signal (source_);
+ signalers [destination_]->send (command_);
}
-bool zmq::dispatcher_t::read (uint32_t source_, uint32_t destination_,
- command_t *command_)
+bool zmq::dispatcher_t::recv_command (uint32_t thread_slot_,
+ command_t *command_, bool block_)
{
- return command_pipes [source_ * signalers.size () +
- destination_].read (command_);
+ return signalers [thread_slot_]->recv (command_, block_);
}
zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)