From eb9bc1b0648d2132e612e2237a0ace47004d6f5c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 30 Apr 2011 06:48:18 +0200 Subject: Message atomicity problem in PUB socket fixed. Reaching the HWM caused breaking message atomicity when the flow was reestablished - initial parts of multipart messages may have been lost. Signed-off-by: Martin Sustrik --- src/dist.cpp | 73 +++++++++++++++++++++++++++++++++--------------------------- src/dist.hpp | 20 ++++++++--------- 2 files changed, 49 insertions(+), 44 deletions(-) (limited to 'src') diff --git a/src/dist.cpp b/src/dist.cpp index 093da79..9201294 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -23,9 +23,11 @@ #include "err.hpp" #include "own.hpp" #include "msg.hpp" +#include "likely.hpp" zmq::dist_t::dist_t (own_t *sink_) : active (0), + eligible (0), more (false), sink (sink_), terminating (false) @@ -39,20 +41,24 @@ zmq::dist_t::~dist_t () void zmq::dist_t::attach (writer_t *pipe_) { - // If we are in the middle of sending a message, let's postpone plugging - // in the pipe. - if (!terminating && more) { - new_pipes.push_back (pipe_); - return; - } - pipe_->set_event_sink (this); - pipes.push_back (pipe_); - pipes.swap (active, pipes.size () - 1); - active++; + // If we are in the middle of sending a message, we'll add new pipe + // into the list of eligible pipes. Otherwise we add it to the list + // of active pipes. + if (more) { + pipes.push_back (pipe_); + pipes.swap (eligible, pipes.size () - 1); + eligible++; + } + else { + pipes.push_back (pipe_); + pipes.swap (active, pipes.size () - 1); + active++; + eligible++; + } - if (terminating) { + if (unlikely (terminating)) { sink->register_term_acks (1); pipe_->terminate (); } @@ -70,21 +76,32 @@ void zmq::dist_t::terminate () void zmq::dist_t::terminated (writer_t *pipe_) { - // Remove the pipe from the list; adjust number of active pipes - // accordingly. + // Remove the pipe from the list; adjust number of active and/or + // eligible pipes accordingly. if (pipes.index (pipe_) < active) active--; + if (pipes.index (pipe_) < eligible) + eligible--; pipes.erase (pipe_); - if (terminating) + if (unlikely (terminating)) sink->unregister_term_ack (); } void zmq::dist_t::activated (writer_t *pipe_) { - // Move the pipe to the list of active pipes. - pipes.swap (pipes.index (pipe_), active); - active++; + // If we are in the middle of sending a message, we'll add the pipe + // into the list of eligible pipes. Otherwise we add it to the list + // of active pipes. + if (more) { + pipes.swap (pipes.index (pipe_), eligible); + eligible++; + } + else { + pipes.swap (pipes.index (pipe_), active); + active++; + eligible++; + } } int zmq::dist_t::send (msg_t *msg_, int flags_) @@ -95,9 +112,9 @@ int zmq::dist_t::send (msg_t *msg_, int flags_) // Push the message to active pipes. distribute (msg_, flags_); - // If mutlipart message is fully sent, activate new pipes. - if (more && !msg_more) - clear_new_pipes (); + // If mutlipart message is fully sent, activate all the eligible pipes. + if (!msg_more) + active = eligible; more = msg_more; @@ -141,8 +158,10 @@ bool zmq::dist_t::has_out () bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { + pipes.swap (pipes.index (pipe_), active - 1); active--; - pipes.swap (pipes.index (pipe_), active); + pipes.swap (active, eligible - 1); + eligible--; return false; } if (!(msg_->flags () & msg_t::more)) @@ -150,15 +169,3 @@ bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) return true; } -void zmq::dist_t::clear_new_pipes () -{ - for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); - ++it) { - (*it)->set_event_sink (this); - pipes.push_back (*it); - pipes.swap (active, pipes.size () - 1); - active++; - } - new_pipes.clear (); -} - diff --git a/src/dist.hpp b/src/dist.hpp index ea05305..fd522b9 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -56,24 +56,22 @@ namespace zmq // Put the message to all active pipes. void distribute (class msg_t *msg_, int flags_); - // Plug in all the delayed pipes. - void clear_new_pipes (); - // List of outbound pipes. typedef array_t pipes_t; pipes_t pipes; - // List of new pipes that were not yet inserted into 'pipes' list. - // These pipes are moves to 'pipes' list once the current multipart - // message is fully sent. This way we avoid sending incomplete messages - // to peers. - typedef std::vector new_pipes_t; - new_pipes_t new_pipes; - // Number of active pipes. All the active pipes are located at the - // beginning of the pipes array. + // beginning of the pipes array. These are the pipes the messages + // can be sent to at the moment. pipes_t::size_type active; + // Number of pipes eligible for sending messages to. This includes all + // the active pipes plus all the pipes that we can in theory send + // messages to (the HWM is not yet reached), but sending a message + // to them would result in partial message being delivered, ie. message + // with initial parts missing. + pipes_t::size_type eligible; + // True if last we are in the middle of a multipart message. bool more; -- cgit v1.2.3