From bbfac783f91f6692b7f9c0aa5392ac955f7b49bf Mon Sep 17 00:00:00 2001
From: Martin Sustrik <sustrik@250bpm.com>
Date: Sat, 27 Mar 2010 09:43:49 +0100
Subject: multi-part message work with UPSTREAM/DOWNSTREAM

---
 src/lb.cpp | 25 +++++++++++++++++++------
 src/lb.hpp |  3 +++
 2 files changed, 22 insertions(+), 6 deletions(-)

(limited to 'src')

diff --git a/src/lb.cpp b/src/lb.cpp
index e282d24..b497b11 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -25,7 +25,8 @@
 
 zmq::lb_t::lb_t () :
     active (0),
-    current (0)
+    current (0),
+    tbc (false)
 {
 }
 
@@ -44,6 +45,8 @@ void zmq::lb_t::attach (writer_t *pipe_)
 
 void zmq::lb_t::detach (writer_t *pipe_)
 {
+    zmq_assert (!tbc || pipes [current] != pipe_);
+
     //  Remove the pipe from the list; adjust number of active pipes
     //  accordingly.
     if (pipes.index (pipe_) < active) {
@@ -64,9 +67,12 @@ void zmq::lb_t::revive (writer_t *pipe_)
 int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
 {
     while (active > 0) {
-        if (pipes [current]->write (msg_))
+        if (pipes [current]->write (msg_)) {
+            tbc = msg_->flags & ZMQ_MSG_TBC;
             break;
+        }
 
+        zmq_assert (!tbc);
         active--;
         if (current < active)
             pipes.swap (current, active);
@@ -80,20 +86,27 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
         return -1;
     }
 
-    pipes [current]->flush ();
+    //  If it's final part of the message we can fluch it downstream and
+    //  continue round-robinning (load balance).
+    if (!tbc) {
+        pipes [current]->flush ();
+        current = (current + 1) % active;
+    }
 
     //  Detach the message from the data buffer.
     int rc = zmq_msg_init (msg_);
     zmq_assert (rc == 0);
 
-    //  Move to the next pipe (load-balancing).
-    current = (current + 1) % active;
-
     return 0;
 }
 
 bool zmq::lb_t::has_out ()
 {
+    //  If one part of the message was already written we can definitely
+    //  write the rest of the message.
+    if (tbc)
+        return true;
+
     while (active > 0) {
         if (pipes [current]->check_write ())
             return true;
diff --git a/src/lb.hpp b/src/lb.hpp
index 5bddc1e..79f83c5 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -53,6 +53,9 @@ namespace zmq
         //  Points to the last pipe that the most recent message was sent to.
         pipes_t::size_type current;
 
+        //  True if last we are in the middle of a multipart message.
+        bool tbc;
+
         lb_t (const lb_t&);
         void operator = (const lb_t&);
     };
-- 
cgit v1.2.3