From 92c7c18367f91c6341fc617026f5e25000466b05 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 20 Mar 2011 11:50:51 +0100 Subject: Message atomicity problem solved in PUB socket When new peer connects to a PUB socket while it is in the middle of sending of multi-part messages, it gets just the remaining part of the message, i.e. message atomicity is broken. This patch drops the tail part of the message and starts sending to the peer only when new message is started. Signed-off-by: Martin Sustrik --- src/dist.cpp | 44 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 5 deletions(-) (limited to 'src/dist.cpp') diff --git a/src/dist.cpp b/src/dist.cpp index dd2166a..e447bc1 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -41,6 +41,13 @@ zmq::dist_t::~dist_t () void zmq::dist_t::attach (writer_t *pipe_) { + // If we are in the middle of sending a message, let's postpone plugging + // in the pipe. + if (!terminating && more) { + new_pipes.push_back (pipe_); + return; + } + pipe_->set_event_sink (this); pipes.push_back (pipe_); @@ -83,6 +90,23 @@ void zmq::dist_t::activated (writer_t *pipe_) } int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) +{ + // Is this end of a multipart message? + bool msg_more = msg_->flags & ZMQ_MSG_MORE; + + // Push the message to active pipes. + distribute (msg_, flags_); + + // If mutlipart message is fully sent, activate new pipes. + if (more && !msg_more) + clear_new_pipes (); + + more = msg_more; + + return 0; +} + +void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_) { // If there are no active pipes available, simply drop the message. if (active == 0) { @@ -90,7 +114,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) zmq_assert (rc == 0); rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - return 0; + return; } msg_content_t *content = (msg_content_t*) msg_->content; @@ -102,7 +126,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) i++; int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - return 0; + return; } // Optimisation for the case when there's only a single pipe @@ -115,7 +139,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) } int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - return 0; + return; } // There are at least 2 destinations for the message. That means we have @@ -139,8 +163,6 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) // Detach the original message from the data buffer. int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - - return 0; } bool zmq::dist_t::has_out () @@ -160,3 +182,15 @@ bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) return true; } +void zmq::dist_t::clear_new_pipes () +{ + for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end (); + ++it) { + (*it)->set_event_sink (this); + pipes.push_back (*it); + pipes.swap (active, pipes.size () - 1); + active++; + } + new_pipes.clear (); +} + -- cgit v1.2.3