summaryrefslogtreecommitdiff
path: root/src/dispatcher.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-02-07 09:14:43 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-02-07 09:14:43 +0100
commitbbaa494fb569d94c356ddecca7dbf249ffc217cf (patch)
tree84a6f06b8db50c0d09066c3fdd20507e4f8b780f /src/dispatcher.cpp
parentd21bf21a362cf6d20d8d060bb91ee2fdca1dd88b (diff)
ZMQII-59: TCP server crashes sometimes when message is send and socket is closed immediately
Diffstat (limited to 'src/dispatcher.cpp')
-rw-r--r--src/dispatcher.cpp23
1 files changed, 23 insertions, 0 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 806866c..167b823 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -67,6 +67,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
signalers.push_back (io_thread->get_signaler ());
}
+ // Create the administrative thread. Nothing special is needed. NULL
+ // is used instead of signaler given that as for now, administrative
+ // thread doesn't receive any commands. The only thing it is used for
+ // is sending 'stop' command to I/O threads on shutdown.
+ signalers.push_back (NULL);
+
// Create command pipe matrix.
command_pipes = new (std::nothrow) command_pipe_t [signalers.size () *
signalers.size ()];
@@ -159,6 +165,23 @@ void zmq::dispatcher_t::destroy_socket ()
delete this;
}
+void zmq::dispatcher_t::write (int source_, int destination_,
+ const command_t &command_)
+{
+ command_pipe_t &pipe =
+ command_pipes [source_ * signalers.size () + destination_];
+ pipe.write (command_);
+ if (!pipe.flush ())
+ signalers [destination_]->signal (source_);
+}
+
+bool zmq::dispatcher_t::read (int source_, int destination_,
+ command_t *command_)
+{
+ return command_pipes [source_ * signalers.size () +
+ destination_].read (command_);
+}
+
zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{
// Check whether thread ID is already assigned. If so, return it.