From bbaa494fb569d94c356ddecca7dbf249ffc217cf Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 7 Feb 2010 09:14:43 +0100 Subject: ZMQII-59: TCP server crashes sometimes when message is send and socket is closed immediately --- src/dispatcher.cpp | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) (limited to 'src/dispatcher.cpp') diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 806866c..167b823 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -67,6 +67,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, signalers.push_back (io_thread->get_signaler ()); } + // Create the administrative thread. Nothing special is needed. NULL + // is used instead of signaler given that as for now, administrative + // thread doesn't receive any commands. The only thing it is used for + // is sending 'stop' command to I/O threads on shutdown. + signalers.push_back (NULL); + // Create command pipe matrix. command_pipes = new (std::nothrow) command_pipe_t [signalers.size () * signalers.size ()]; @@ -159,6 +165,23 @@ void zmq::dispatcher_t::destroy_socket () delete this; } +void zmq::dispatcher_t::write (int source_, int destination_, + const command_t &command_) +{ + command_pipe_t &pipe = + command_pipes [source_ * signalers.size () + destination_]; + pipe.write (command_); + if (!pipe.flush ()) + signalers [destination_]->signal (source_); +} + +bool zmq::dispatcher_t::read (int source_, int destination_, + command_t *command_) +{ + return command_pipes [source_ * signalers.size () + + destination_].read (command_); +} + zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () { // Check whether thread ID is already assigned. If so, return it. -- cgit v1.2.3