summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-02-07 09:14:43 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-02-07 09:14:43 +0100
commitbbaa494fb569d94c356ddecca7dbf249ffc217cf (patch)
tree84a6f06b8db50c0d09066c3fdd20507e4f8b780f
parentd21bf21a362cf6d20d8d060bb91ee2fdca1dd88b (diff)
ZMQII-59: TCP server crashes sometimes when message is send and socket is closed immediately
-rw-r--r--src/dispatcher.cpp23
-rw-r--r--src/dispatcher.hpp16
-rw-r--r--src/object.cpp12
3 files changed, 30 insertions, 21 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 806866c..167b823 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -67,6 +67,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
signalers.push_back (io_thread->get_signaler ());
}
+ // Create the administrative thread. Nothing special is needed. NULL
+ // is used instead of signaler given that as for now, administrative
+ // thread doesn't receive any commands. The only thing it is used for
+ // is sending 'stop' command to I/O threads on shutdown.
+ signalers.push_back (NULL);
+
// Create command pipe matrix.
command_pipes = new (std::nothrow) command_pipe_t [signalers.size () *
signalers.size ()];
@@ -159,6 +165,23 @@ void zmq::dispatcher_t::destroy_socket ()
delete this;
}
+void zmq::dispatcher_t::write (int source_, int destination_,
+ const command_t &command_)
+{
+ command_pipe_t &pipe =
+ command_pipes [source_ * signalers.size () + destination_];
+ pipe.write (command_);
+ if (!pipe.flush ())
+ signalers [destination_]->signal (source_);
+}
+
+bool zmq::dispatcher_t::read (int source_, int destination_,
+ command_t *command_)
+{
+ return command_pipes [source_ * signalers.size () +
+ destination_].read (command_);
+}
+
zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{
// Check whether thread ID is already assigned. If so, return it.
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index 799e65b..2c11c57 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -70,23 +70,11 @@ namespace zmq
int thread_slot_count ();
// Send command from the source to the destination.
- inline void write (int source_, int destination_,
- const command_t &command_)
- {
- command_pipe_t &pipe =
- command_pipes [source_ * signalers.size () + destination_];
- pipe.write (command_);
- if (!pipe.flush ())
- signalers [destination_]->signal (source_);
- }
+ void write (int source_, int destination_, const command_t &command_);
// Receive command from the source. Returns false if there is no
// command available.
- inline bool read (int source_, int destination_, command_t *command_)
- {
- return command_pipes [source_ * signalers.size () +
- destination_].read (command_);
- }
+ bool read (int source_, int destination_, command_t *command_);
// Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all).
diff --git a/src/object.cpp b/src/object.cpp
index 20e712a..a977f39 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -146,12 +146,13 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
void zmq::object_t::send_stop ()
{
- // Send command goes always to the current object. To-self pipe is
- // used exclusively for sending this command.
+ // 'stop' command goes always from administrative thread to
+ // the current object.
+ int admin_thread_id = dispatcher->thread_slot_count () - 1;
command_t cmd;
cmd.destination = this;
cmd.type = command_t::stop;
- dispatcher->write (thread_slot, thread_slot, cmd);
+ dispatcher->write (admin_thread_id, thread_slot, cmd);
}
void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
@@ -314,9 +315,6 @@ void zmq::object_t::process_seqnum ()
void zmq::object_t::send_command (command_t &cmd_)
{
int destination_thread_slot = cmd_.destination->get_thread_slot ();
- if (destination_thread_slot == thread_slot)
- cmd_.destination->process_command (cmd_);
- else
- dispatcher->write (thread_slot, destination_thread_slot, cmd_);
+ dispatcher->write (thread_slot, destination_thread_slot, cmd_);
}