From eb9bc1b0648d2132e612e2237a0ace47004d6f5c Mon Sep 17 00:00:00 2001
From: Martin Sustrik <sustrik@250bpm.com>
Date: Sat, 30 Apr 2011 06:48:18 +0200
Subject: Message atomicity problem in PUB socket fixed.

Reaching the HWM caused breaking message atomicity when the
flow was reestablished - initial parts of multipart messages
may have been lost.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
---
 src/dist.cpp | 73 +++++++++++++++++++++++++++++++++---------------------------
 src/dist.hpp | 20 ++++++++---------
 2 files changed, 49 insertions(+), 44 deletions(-)

(limited to 'src')

diff --git a/src/dist.cpp b/src/dist.cpp
index 093da79..9201294 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -23,9 +23,11 @@
 #include "err.hpp"
 #include "own.hpp"
 #include "msg.hpp"
+#include "likely.hpp"
 
 zmq::dist_t::dist_t (own_t *sink_) :
     active (0),
+    eligible (0),
     more (false),
     sink (sink_),
     terminating (false)
@@ -39,20 +41,24 @@ zmq::dist_t::~dist_t ()
 
 void zmq::dist_t::attach (writer_t *pipe_)
 {
-    //  If we are in the middle of sending a message, let's postpone plugging
-    //  in the pipe.
-    if (!terminating && more) {
-        new_pipes.push_back (pipe_);
-        return;
-    }
-
     pipe_->set_event_sink (this);
 
-    pipes.push_back (pipe_);
-    pipes.swap (active, pipes.size () - 1);
-    active++;
+    //  If we are in the middle of sending a message, we'll add new pipe
+    //  into the list of eligible pipes. Otherwise we add it to the list
+    //  of active pipes.
+    if (more) {
+        pipes.push_back (pipe_);
+        pipes.swap (eligible, pipes.size () - 1);
+        eligible++;
+    }
+    else {
+        pipes.push_back (pipe_);
+        pipes.swap (active, pipes.size () - 1);
+        active++;
+        eligible++;
+    }
 
-    if (terminating) {
+    if (unlikely (terminating)) {
         sink->register_term_acks (1);
         pipe_->terminate ();
     }
@@ -70,21 +76,32 @@ void zmq::dist_t::terminate ()
 
 void zmq::dist_t::terminated (writer_t *pipe_)
 {
-    //  Remove the pipe from the list; adjust number of active pipes
-    //  accordingly.
+    //  Remove the pipe from the list; adjust number of active and/or
+    //  eligible pipes accordingly.
     if (pipes.index (pipe_) < active)
         active--;
+    if (pipes.index (pipe_) < eligible)
+        eligible--;
     pipes.erase (pipe_);
 
-    if (terminating)
+    if (unlikely (terminating))
         sink->unregister_term_ack ();
 }
 
 void zmq::dist_t::activated (writer_t *pipe_)
 {
-    //  Move the pipe to the list of active pipes.
-    pipes.swap (pipes.index (pipe_), active);
-    active++;
+    //  If we are in the middle of sending a message, we'll add the pipe
+    //  into the list of eligible pipes. Otherwise we add it to the list
+    //  of active pipes.
+    if (more) {
+        pipes.swap (pipes.index (pipe_), eligible);
+        eligible++;
+    }
+    else {
+        pipes.swap (pipes.index (pipe_), active);
+        active++;
+        eligible++;
+    }
 }
 
 int zmq::dist_t::send (msg_t *msg_, int flags_)
@@ -95,9 +112,9 @@ int zmq::dist_t::send (msg_t *msg_, int flags_)
     //  Push the message to active pipes.
     distribute (msg_, flags_);
 
-    //  If mutlipart message is fully sent, activate new pipes.
-    if (more && !msg_more)
-        clear_new_pipes ();
+    //  If mutlipart message is fully sent, activate all the eligible pipes.
+    if (!msg_more)
+        active = eligible;
 
     more = msg_more;
 
@@ -141,8 +158,10 @@ bool zmq::dist_t::has_out ()
 bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_)
 {
     if (!pipe_->write (msg_)) {
+        pipes.swap (pipes.index (pipe_), active - 1);
         active--;
-        pipes.swap (pipes.index (pipe_), active);
+        pipes.swap (active, eligible - 1);
+        eligible--;
         return false;
     }
     if (!(msg_->flags () & msg_t::more))
@@ -150,15 +169,3 @@ bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_)
     return true;
 }
 
-void zmq::dist_t::clear_new_pipes ()
-{
-    for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end ();
-          ++it) {
-        (*it)->set_event_sink (this);
-        pipes.push_back (*it);
-        pipes.swap (active, pipes.size () - 1);
-        active++;
-    }
-    new_pipes.clear ();
-}
-
diff --git a/src/dist.hpp b/src/dist.hpp
index ea05305..fd522b9 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -56,24 +56,22 @@ namespace zmq
         //  Put the message to all active pipes.
         void distribute (class msg_t *msg_, int flags_);
 
-        //  Plug in all the delayed pipes.
-        void clear_new_pipes ();
-
         //  List of outbound pipes.
         typedef array_t <class writer_t> pipes_t;
         pipes_t pipes;
 
-        //  List of new pipes that were not yet inserted into 'pipes' list.
-        //  These pipes are moves to 'pipes' list once the current multipart
-        //  message is fully sent. This way we avoid sending incomplete messages
-        //  to peers.
-        typedef std::vector <class writer_t*> new_pipes_t;
-        new_pipes_t new_pipes;
-
         //  Number of active pipes. All the active pipes are located at the
-        //  beginning of the pipes array.
+        //  beginning of the pipes array. These are the pipes the messages
+        //  can be sent to at the moment.
         pipes_t::size_type active;
 
+        //  Number of pipes eligible for sending messages to. This includes all
+        //  the active pipes plus all the pipes that we can in theory send
+        //  messages to (the HWM is not yet reached), but sending a message
+        //  to them would result in partial message being delivered, ie. message
+        //  with initial parts missing.
+        pipes_t::size_type eligible;
+
         //  True if last we are in the middle of a multipart message.
         bool more;
 
-- 
cgit v1.2.3