summaryrefslogtreecommitdiff
path: root/src/dispatcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dispatcher.cpp')
-rw-r--r--src/dispatcher.cpp23
1 files changed, 23 insertions, 0 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 806866c..167b823 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -67,6 +67,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
signalers.push_back (io_thread->get_signaler ());
}
+ // Create the administrative thread. Nothing special is needed. NULL
+ // is used instead of signaler given that as for now, administrative
+ // thread doesn't receive any commands. The only thing it is used for
+ // is sending 'stop' command to I/O threads on shutdown.
+ signalers.push_back (NULL);
+
// Create command pipe matrix.
command_pipes = new (std::nothrow) command_pipe_t [signalers.size () *
signalers.size ()];
@@ -159,6 +165,23 @@ void zmq::dispatcher_t::destroy_socket ()
delete this;
}
+void zmq::dispatcher_t::write (int source_, int destination_,
+ const command_t &command_)
+{
+ command_pipe_t &pipe =
+ command_pipes [source_ * signalers.size () + destination_];
+ pipe.write (command_);
+ if (!pipe.flush ())
+ signalers [destination_]->signal (source_);
+}
+
+bool zmq::dispatcher_t::read (int source_, int destination_,
+ command_t *command_)
+{
+ return command_pipes [source_ * signalers.size () +
+ destination_].read (command_);
+}
+
zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{
// Check whether thread ID is already assigned. If so, return it.