diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-04-30 06:48:18 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-04-30 06:48:18 +0200 |
commit | eb9bc1b0648d2132e612e2237a0ace47004d6f5c (patch) | |
tree | 5c904725437a8436c428868c907aeeca57b3d0d5 /src/dist.cpp | |
parent | fe2e772dd5d36024a91ce3abb86996599960e078 (diff) |
Message atomicity problem in PUB socket fixed.
Reaching the HWM caused breaking message atomicity when the
flow was reestablished - initial parts of multipart messages
may have been lost.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/dist.cpp')
-rw-r--r-- | src/dist.cpp | 73 |
1 files changed, 40 insertions, 33 deletions
diff --git a/src/dist.cpp b/src/dist.cpp index 093da79..9201294 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -23,9 +23,11 @@ #include "err.hpp" #include "own.hpp" #include "msg.hpp" +#include "likely.hpp" zmq::dist_t::dist_t (own_t *sink_) : active (0), + eligible (0), more (false), sink (sink_), terminating (false) @@ -39,20 +41,24 @@ zmq::dist_t::~dist_t () void zmq::dist_t::attach (writer_t *pipe_) { - // If we are in the middle of sending a message, let's postpone plugging - // in the pipe. - if (!terminating && more) { - new_pipes.push_back (pipe_); - return; - } - pipe_->set_event_sink (this); - pipes.push_back (pipe_); - pipes.swap (active, pipes.size () - 1); - active++; + // If we are in the middle of sending a message, we'll add new pipe + // into the list of eligible pipes. Otherwise we add it to the list + // of active pipes. + if (more) { + pipes.push_back (pipe_); + pipes.swap (eligible, pipes.size () - 1); + eligible++; + } + else { + pipes.push_back (pipe_); + pipes.swap (active, pipes.size () - 1); + active++; + eligible++; + } - if (terminating) { + if (unlikely (terminating)) { sink->register_term_acks (1); pipe_->terminate (); } @@ -70,21 +76,32 @@ void zmq::dist_t::terminate () void zmq::dist_t::terminated (writer_t *pipe_) { - // Remove the pipe from the list; adjust number of active pipes - // accordingly. + // Remove the pipe from the list; adjust number of active and/or + // eligible pipes accordingly. if (pipes.index (pipe_) < active) active--; + if (pipes.index (pipe_) < eligible) + eligible--; pipes.erase (pipe_); - if (terminating) + if (unlikely (terminating)) sink->unregister_term_ack (); } void zmq::dist_t::activated (writer_t *pipe_) { - // Move the pipe to the list of active pipes. - pipes.swap (pipes.index (pipe_), active); - active++; + // If we are in the middle of sending a message, we'll add the pipe + // into the list of eligible pipes. Otherwise we add it to the list + // of active pipes. + if (more) { + pipes.swap (pipes.index (pipe_), eligible); + eligible++; + } + else { + pipes.swap (pipes.index (pipe_), active); + active++; + eligible++; + } } int zmq::dist_t::send (msg_t *msg_, int flags_) @@ -95,9 +112,9 @@ int zmq::dist_t::send (msg_t *msg_, int flags_) // Push the message to active pipes. distribute (msg_, flags_); - // If mutlipart message is fully sent, activate new pipes. - if (more && !msg_more) - clear_new_pipes (); + // If mutlipart message is fully sent, activate all the eligible pipes. + if (!msg_more) + active = eligible; more = msg_more; @@ -141,8 +158,10 @@ bool zmq::dist_t::has_out () bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { + pipes.swap (pipes.index (pipe_), active - 1); active--; - pipes.swap (pipes.index (pipe_), active); + pipes.swap (active, eligible - 1); + eligible--; return false; } if (!(msg_->flags () & msg_t::more)) @@ -150,15 +169,3 @@ bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) return true; } -void zmq::dist_t::clear_new_pipes () -{ - for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); - ++it) { - (*it)->set_event_sink (this); - pipes.push_back (*it); - pipes.swap (active, pipes.size () - 1); - active++; - } - new_pipes.clear (); -} - |