From b593ea30833ad5dcacb9076c988aec31b0cf26ec Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Mon, 23 Jan 2012 08:53:57 +0100 Subject: Imported Upstream version 2.1.7 --- src/dist.cpp | 72 ++++++++++++++++++++++++++++++++---------------------------- 1 file changed, 38 insertions(+), 34 deletions(-) (limited to 'src/dist.cpp') diff --git a/src/dist.cpp b/src/dist.cpp index e447bc1..d74e69f 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -25,9 +25,11 @@ #include "err.hpp" #include "own.hpp" #include "msg_content.hpp" +#include "likely.hpp" zmq::dist_t::dist_t (own_t *sink_) : active (0), + eligible (0), more (false), sink (sink_), terminating (false) @@ -41,20 +43,24 @@ zmq::dist_t::~dist_t () void zmq::dist_t::attach (writer_t *pipe_) { - // If we are in the middle of sending a message, let's postpone plugging - // in the pipe. - if (!terminating && more) { - new_pipes.push_back (pipe_); - return; - } - pipe_->set_event_sink (this); - pipes.push_back (pipe_); - pipes.swap (active, pipes.size () - 1); - active++; + // If we are in the middle of sending a message, we'll add new pipe + // into the list of eligible pipes. Otherwise we add it to the list + // of active pipes. + if (more) { + pipes.push_back (pipe_); + pipes.swap (eligible, pipes.size () - 1); + eligible++; + } + else { + pipes.push_back (pipe_); + pipes.swap (active, pipes.size () - 1); + active++; + eligible++; + } - if (terminating) { + if (unlikely (terminating)) { sink->register_term_acks (1); pipe_->terminate (); } @@ -72,21 +78,30 @@ void zmq::dist_t::terminate () void zmq::dist_t::terminated (writer_t *pipe_) { - // Remove the pipe from the list; adjust number of active pipes - // accordingly. + // Remove the pipe from the list; adjust number of active and/or + // eligible pipes accordingly. if (pipes.index (pipe_) < active) active--; + if (pipes.index (pipe_) < eligible) + eligible--; pipes.erase (pipe_); - if (terminating) + if (unlikely (terminating)) sink->unregister_term_ack (); } void zmq::dist_t::activated (writer_t *pipe_) { - // Move the pipe to the list of active pipes. - pipes.swap (pipes.index (pipe_), active); - active++; + // Move the pipe from passive to eligible state. + pipes.swap (pipes.index (pipe_), eligible); + eligible++; + + // If there's no message being sent at the moment, move it to + // the active state. + if (!more) { + pipes.swap (eligible - 1, active); + active++; + } } int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) @@ -97,9 +112,9 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) // Push the message to active pipes. distribute (msg_, flags_); - // If mutlipart message is fully sent, activate new pipes. - if (more && !msg_more) - clear_new_pipes (); + // If multipart message is fully sent, activate all the eligible pipes. + if (!msg_more) + active = eligible; more = msg_more; @@ -173,24 +188,13 @@ bool zmq::dist_t::has_out () bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) { if (!pipe_->write (msg_)) { + pipes.swap (pipes.index (pipe_), active - 1); active--; - pipes.swap (pipes.index (pipe_), active); + pipes.swap (active, eligible - 1); + eligible--; return false; } if (!(msg_->flags & ZMQ_MSG_MORE)) pipe_->flush (); return true; } - -void zmq::dist_t::clear_new_pipes () -{ - for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); - ++it) { - (*it)->set_event_sink (this); - pipes.push_back (*it); - pipes.swap (active, pipes.size () - 1); - active++; - } - new_pipes.clear (); -} - -- cgit v1.2.3