diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-02-07 09:14:43 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-02-07 09:14:43 +0100 |
commit | bbaa494fb569d94c356ddecca7dbf249ffc217cf (patch) | |
tree | 84a6f06b8db50c0d09066c3fdd20507e4f8b780f | |
parent | d21bf21a362cf6d20d8d060bb91ee2fdca1dd88b (diff) |
ZMQII-59: TCP server crashes sometimes when message is send and socket is closed immediately
-rw-r--r-- | src/dispatcher.cpp | 23 | ||||
-rw-r--r-- | src/dispatcher.hpp | 16 | ||||
-rw-r--r-- | src/object.cpp | 12 |
3 files changed, 30 insertions, 21 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 806866c..167b823 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -67,6 +67,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, signalers.push_back (io_thread->get_signaler ()); } + // Create the administrative thread. Nothing special is needed. NULL + // is used instead of signaler given that as for now, administrative + // thread doesn't receive any commands. The only thing it is used for + // is sending 'stop' command to I/O threads on shutdown. + signalers.push_back (NULL); + // Create command pipe matrix. command_pipes = new (std::nothrow) command_pipe_t [signalers.size () * signalers.size ()]; @@ -159,6 +165,23 @@ void zmq::dispatcher_t::destroy_socket () delete this; } +void zmq::dispatcher_t::write (int source_, int destination_, + const command_t &command_) +{ + command_pipe_t &pipe = + command_pipes [source_ * signalers.size () + destination_]; + pipe.write (command_); + if (!pipe.flush ()) + signalers [destination_]->signal (source_); +} + +bool zmq::dispatcher_t::read (int source_, int destination_, + command_t *command_) +{ + return command_pipes [source_ * signalers.size () + + destination_].read (command_); +} + zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () { // Check whether thread ID is already assigned. If so, return it. diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 799e65b..2c11c57 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -70,23 +70,11 @@ namespace zmq int thread_slot_count (); // Send command from the source to the destination. - inline void write (int source_, int destination_, - const command_t &command_) - { - command_pipe_t &pipe = - command_pipes [source_ * signalers.size () + destination_]; - pipe.write (command_); - if (!pipe.flush ()) - signalers [destination_]->signal (source_); - } + void write (int source_, int destination_, const command_t &command_); // Receive command from the source. Returns false if there is no // command available. - inline bool read (int source_, int destination_, command_t *command_) - { - return command_pipes [source_ * signalers.size () + - destination_].read (command_); - } + bool read (int source_, int destination_, command_t *command_); // Returns the I/O thread that is the least busy at the moment. // Taskset specifies which I/O threads are eligible (0 = all). diff --git a/src/object.cpp b/src/object.cpp index 20e712a..a977f39 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -146,12 +146,13 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) void zmq::object_t::send_stop () { - // Send command goes always to the current object. To-self pipe is - // used exclusively for sending this command. + // 'stop' command goes always from administrative thread to + // the current object. + int admin_thread_id = dispatcher->thread_slot_count () - 1; command_t cmd; cmd.destination = this; cmd.type = command_t::stop; - dispatcher->write (thread_slot, thread_slot, cmd); + dispatcher->write (admin_thread_id, thread_slot, cmd); } void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) @@ -314,9 +315,6 @@ void zmq::object_t::process_seqnum () void zmq::object_t::send_command (command_t &cmd_) { int destination_thread_slot = cmd_.destination->get_thread_slot (); - if (destination_thread_slot == thread_slot) - cmd_.destination->process_command (cmd_); - else - dispatcher->write (thread_slot, destination_thread_slot, cmd_); + dispatcher->write (thread_slot, destination_thread_slot, cmd_); } |