summaryrefslogtreecommitdiff
path: root/src/dist.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dist.cpp')
-rw-r--r--src/dist.cpp72
1 files changed, 38 insertions, 34 deletions
diff --git a/src/dist.cpp b/src/dist.cpp
index e447bc1..d74e69f 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -25,9 +25,11 @@
#include "err.hpp"
#include "own.hpp"
#include "msg_content.hpp"
+#include "likely.hpp"
zmq::dist_t::dist_t (own_t *sink_) :
active (0),
+ eligible (0),
more (false),
sink (sink_),
terminating (false)
@@ -41,20 +43,24 @@ zmq::dist_t::~dist_t ()
void zmq::dist_t::attach (writer_t *pipe_)
{
- // If we are in the middle of sending a message, let's postpone plugging
- // in the pipe.
- if (!terminating && more) {
- new_pipes.push_back (pipe_);
- return;
- }
-
pipe_->set_event_sink (this);
- pipes.push_back (pipe_);
- pipes.swap (active, pipes.size () - 1);
- active++;
+ // If we are in the middle of sending a message, we'll add new pipe
+ // into the list of eligible pipes. Otherwise we add it to the list
+ // of active pipes.
+ if (more) {
+ pipes.push_back (pipe_);
+ pipes.swap (eligible, pipes.size () - 1);
+ eligible++;
+ }
+ else {
+ pipes.push_back (pipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+ eligible++;
+ }
- if (terminating) {
+ if (unlikely (terminating)) {
sink->register_term_acks (1);
pipe_->terminate ();
}
@@ -72,21 +78,30 @@ void zmq::dist_t::terminate ()
void zmq::dist_t::terminated (writer_t *pipe_)
{
- // Remove the pipe from the list; adjust number of active pipes
- // accordingly.
+ // Remove the pipe from the list; adjust number of active and/or
+ // eligible pipes accordingly.
if (pipes.index (pipe_) < active)
active--;
+ if (pipes.index (pipe_) < eligible)
+ eligible--;
pipes.erase (pipe_);
- if (terminating)
+ if (unlikely (terminating))
sink->unregister_term_ack ();
}
void zmq::dist_t::activated (writer_t *pipe_)
{
- // Move the pipe to the list of active pipes.
- pipes.swap (pipes.index (pipe_), active);
- active++;
+ // Move the pipe from passive to eligible state.
+ pipes.swap (pipes.index (pipe_), eligible);
+ eligible++;
+
+ // If there's no message being sent at the moment, move it to
+ // the active state.
+ if (!more) {
+ pipes.swap (eligible - 1, active);
+ active++;
+ }
}
int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
@@ -97,9 +112,9 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
// Push the message to active pipes.
distribute (msg_, flags_);
- // If mutlipart message is fully sent, activate new pipes.
- if (more && !msg_more)
- clear_new_pipes ();
+ // If multipart message is fully sent, activate all the eligible pipes.
+ if (!msg_more)
+ active = eligible;
more = msg_more;
@@ -173,24 +188,13 @@ bool zmq::dist_t::has_out ()
bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
{
if (!pipe_->write (msg_)) {
+ pipes.swap (pipes.index (pipe_), active - 1);
active--;
- pipes.swap (pipes.index (pipe_), active);
+ pipes.swap (active, eligible - 1);
+ eligible--;
return false;
}
if (!(msg_->flags & ZMQ_MSG_MORE))
pipe_->flush ();
return true;
}
-
-void zmq::dist_t::clear_new_pipes ()
-{
- for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end ();
- ++it) {
- (*it)->set_event_sink (this);
- pipes.push_back (*it);
- pipes.swap (active, pipes.size () - 1);
- active++;
- }
- new_pipes.clear ();
-}
-