diff options
-rw-r--r-- | src/dist.cpp | 44 | ||||
-rw-r--r-- | src/dist.hpp | 15 |
2 files changed, 54 insertions, 5 deletions
diff --git a/src/dist.cpp b/src/dist.cpp index dd2166a..e447bc1 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -41,6 +41,13 @@ zmq::dist_t::~dist_t () void zmq::dist_t::attach (writer_t *pipe_) { + // If we are in the middle of sending a message, let's postpone plugging + // in the pipe. + if (!terminating && more) { + new_pipes.push_back (pipe_); + return; + } + pipe_->set_event_sink (this); pipes.push_back (pipe_); @@ -84,13 +91,30 @@ void zmq::dist_t::activated (writer_t *pipe_) int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) { + // Is this end of a multipart message? + bool msg_more = msg_->flags & ZMQ_MSG_MORE; + + // Push the message to active pipes. + distribute (msg_, flags_); + + // If mutlipart message is fully sent, activate new pipes. + if (more && !msg_more) + clear_new_pipes (); + + more = msg_more; + + return 0; +} + +void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_) +{ // If there are no active pipes available, simply drop the message. if (active == 0) { int rc = zmq_msg_close (msg_); zmq_assert (rc == 0); rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - return 0; + return; } msg_content_t *content = (msg_content_t*) msg_->content; @@ -102,7 +126,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) i++; int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - return 0; + return; } // Optimisation for the case when there's only a single pipe @@ -115,7 +139,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) } int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - return 0; + return; } // There are at least 2 destinations for the message. That means we have @@ -139,8 +163,6 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) // Detach the original message from the data buffer. int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - - return 0; } bool zmq::dist_t::has_out () @@ -160,3 +182,15 @@ bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) return true; } +void zmq::dist_t::clear_new_pipes () +{ + for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); + ++it) { + (*it)->set_event_sink (this); + pipes.push_back (*it); + pipes.swap (active, pipes.size () - 1); + active++; + } + new_pipes.clear (); +} + diff --git a/src/dist.hpp b/src/dist.hpp index 7eef4ad..ad9767a 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -21,6 +21,8 @@ #ifndef __ZMQ_DIST_HPP_INCLUDED__ #define __ZMQ_DIST_HPP_INCLUDED__ +#include <vector> + #include "array.hpp" #include "pipe.hpp" @@ -51,10 +53,23 @@ namespace zmq // fails. In such a case false is returned. bool write (class writer_t *pipe_, zmq_msg_t *msg_); + // Put the message to all active pipes. + void distribute (zmq_msg_t *msg_, int flags_); + + // Plug in all the delayed pipes. + void clear_new_pipes (); + // List of outbound pipes. typedef array_t <class writer_t> pipes_t; pipes_t pipes; + // List of new pipes that were not yet inserted into 'pipes' list. + // These pipes are moves to 'pipes' list once the current multipart + // message is fully sent. This way we avoid sending incomplete messages + // to peers. + typedef std::vector <class writer_t*> new_pipes_t; + new_pipes_t new_pipes; + // Number of active pipes. All the active pipes are located at the // beginning of the pipes array. pipes_t::size_type active; |