From eb9bc1b0648d2132e612e2237a0ace47004d6f5c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 30 Apr 2011 06:48:18 +0200 Subject: Message atomicity problem in PUB socket fixed. Reaching the HWM caused breaking message atomicity when the flow was reestablished - initial parts of multipart messages may have been lost. Signed-off-by: Martin Sustrik --- src/dist.cpp | 73 +++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 40 insertions(+), 33 deletions(-) (limited to 'src/dist.cpp') diff --git a/src/dist.cpp b/src/dist.cpp index 093da79..9201294 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -23,9 +23,11 @@ #include "err.hpp" #include "own.hpp" #include "msg.hpp" +#include "likely.hpp" zmq::dist_t::dist_t (own_t *sink_) : active (0), + eligible (0), more (false), sink (sink_), terminating (false) @@ -39,20 +41,24 @@ zmq::dist_t::~dist_t () void zmq::dist_t::attach (writer_t *pipe_) { - // If we are in the middle of sending a message, let's postpone plugging - // in the pipe. - if (!terminating && more) { - new_pipes.push_back (pipe_); - return; - } - pipe_->set_event_sink (this); - pipes.push_back (pipe_); - pipes.swap (active, pipes.size () - 1); - active++; + // If we are in the middle of sending a message, we'll add new pipe + // into the list of eligible pipes. Otherwise we add it to the list + // of active pipes. + if (more) { + pipes.push_back (pipe_); + pipes.swap (eligible, pipes.size () - 1); + eligible++; + } + else { + pipes.push_back (pipe_); + pipes.swap (active, pipes.size () - 1); + active++; + eligible++; + } - if (terminating) { + if (unlikely (terminating)) { sink->register_term_acks (1); pipe_->terminate (); } @@ -70,21 +76,32 @@ void zmq::dist_t::terminate () void zmq::dist_t::terminated (writer_t *pipe_) { - // Remove the pipe from the list; adjust number of active pipes - // accordingly. + // Remove the pipe from the list; adjust number of active and/or + // eligible pipes accordingly. if (pipes.index (pipe_) < active) active--; + if (pipes.index (pipe_) < eligible) + eligible--; pipes.erase (pipe_); - if (terminating) + if (unlikely (terminating)) sink->unregister_term_ack (); } void zmq::dist_t::activated (writer_t *pipe_) { - // Move the pipe to the list of active pipes. - pipes.swap (pipes.index (pipe_), active); - active++; + // If we are in the middle of sending a message, we'll add the pipe + // into the list of eligible pipes. Otherwise we add it to the list + // of active pipes. + if (more) { + pipes.swap (pipes.index (pipe_), eligible); + eligible++; + } + else { + pipes.swap (pipes.index (pipe_), active); + active++; + eligible++; + } } int zmq::dist_t::send (msg_t *msg_, int flags_) @@ -95,9 +112,9 @@ int zmq::dist_t::send (msg_t *msg_, int flags_) // Push the message to active pipes. distribute (msg_, flags_); - // If mutlipart message is fully sent, activate new pipes. - if (more && !msg_more) - clear_new_pipes (); + // If mutlipart message is fully sent, activate all the eligible pipes. + if (!msg_more) + active = eligible; more = msg_more; @@ -141,8 +158,10 @@ bool zmq::dist_t::has_out () bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { + pipes.swap (pipes.index (pipe_), active - 1); active--; - pipes.swap (pipes.index (pipe_), active); + pipes.swap (active, eligible - 1); + eligible--; return false; } if (!(msg_->flags () & msg_t::more)) @@ -150,15 +169,3 @@ bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) return true; } -void zmq::dist_t::clear_new_pipes () -{ - for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); - ++it) { - (*it)->set_event_sink (this); - pipes.push_back (*it); - pipes.swap (active, pipes.size () - 1); - active++; - } - new_pipes.clear (); -} - -- cgit v1.2.3