From 1619b3d84a04fe1886347fd83280a6070c9603f4 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <sustrik@250bpm.com>
Date: Sun, 20 Mar 2011 20:52:54 +0100
Subject: Message atomicity bug in load-balancer fixed

If the peer getting the message have disconnected in the middle
of multiplart message, the remaining part of the message went
to a different peer. This patch fixes the issue.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
---
 src/lb.cpp | 25 ++++++++++++++++++++++++-
 src/lb.hpp |  3 +++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/src/lb.cpp b/src/lb.cpp
index 647ff5a..95af4a1 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -29,6 +29,7 @@ zmq::lb_t::lb_t (own_t *sink_) :
     active (0),
     current (0),
     more (false),
+    dropping (false),
     sink (sink_),
     terminating (false)
 {
@@ -65,9 +66,16 @@ void zmq::lb_t::terminate ()
 
 void zmq::lb_t::terminated (writer_t *pipe_)
 {
+    pipes_t::size_type index = pipes.index (pipe_);
+
+    //  If we are in the middle of multipart message and current pipe
+    //  have disconnected, we have to drop the remainder of the message.
+    if (index == current && more)
+        dropping = true;
+
     //  Remove the pipe from the list; adjust number of active pipes
     //  accordingly.
-    if (pipes.index (pipe_) < active) {
+    if (index < active) {
         active--;
         if (current == active)
             current = 0;
@@ -87,6 +95,21 @@ void zmq::lb_t::activated (writer_t *pipe_)
 
 int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
 {
+    //  Drop the message if required. If we are at the end of the message
+    //  switch back to non-dropping mode.
+    if (dropping) {
+
+        more = msg_->flags & ZMQ_MSG_MORE;
+        if (!more)
+            dropping = false;
+
+        int rc = zmq_msg_close (msg_);
+        errno_assert (rc == 0);
+        rc = zmq_msg_init (msg_);
+        zmq_assert (rc == 0);
+        return 0;
+    }
+
     while (active > 0) {
         if (pipes [current]->write (msg_)) {
             more = msg_->flags & ZMQ_MSG_MORE;
diff --git a/src/lb.hpp b/src/lb.hpp
index e8d9bed..0dc11e2 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -61,6 +61,9 @@ namespace zmq
         //  True if last we are in the middle of a multipart message.
         bool more;
 
+        //  True if we are dropping current message.
+        bool dropping;
+
         //  Object to send events to.
         class own_t *sink;
 
-- 
cgit v1.2.3