From bbfac783f91f6692b7f9c0aa5392ac955f7b49bf Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 27 Mar 2010 09:43:49 +0100 Subject: multi-part message work with UPSTREAM/DOWNSTREAM --- src/lb.cpp | 25 +++++++++++++++++++------ src/lb.hpp | 3 +++ 2 files changed, 22 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/lb.cpp b/src/lb.cpp index e282d24..b497b11 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -25,7 +25,8 @@ zmq::lb_t::lb_t () : active (0), - current (0) + current (0), + tbc (false) { } @@ -44,6 +45,8 @@ void zmq::lb_t::attach (writer_t *pipe_) void zmq::lb_t::detach (writer_t *pipe_) { + zmq_assert (!tbc || pipes [current] != pipe_); + // Remove the pipe from the list; adjust number of active pipes // accordingly. if (pipes.index (pipe_) < active) { @@ -64,9 +67,12 @@ void zmq::lb_t::revive (writer_t *pipe_) int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) { while (active > 0) { - if (pipes [current]->write (msg_)) + if (pipes [current]->write (msg_)) { + tbc = msg_->flags & ZMQ_MSG_TBC; break; + } + zmq_assert (!tbc); active--; if (current < active) pipes.swap (current, active); @@ -80,20 +86,27 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) return -1; } - pipes [current]->flush (); + // If it's final part of the message we can fluch it downstream and + // continue round-robinning (load balance). + if (!tbc) { + pipes [current]->flush (); + current = (current + 1) % active; + } // Detach the message from the data buffer. int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - // Move to the next pipe (load-balancing). - current = (current + 1) % active; - return 0; } bool zmq::lb_t::has_out () { + // If one part of the message was already written we can definitely + // write the rest of the message. + if (tbc) + return true; + while (active > 0) { if (pipes [current]->check_write ()) return true; diff --git a/src/lb.hpp b/src/lb.hpp index 5bddc1e..79f83c5 100644 --- a/src/lb.hpp +++ b/src/lb.hpp @@ -53,6 +53,9 @@ namespace zmq // Points to the last pipe that the most recent message was sent to. pipes_t::size_type current; + // True if last we are in the middle of a multipart message. + bool tbc; + lb_t (const lb_t&); void operator = (const lb_t&); }; -- cgit v1.2.3