diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lb.cpp | 25 | ||||
-rw-r--r-- | src/lb.hpp | 3 |
2 files changed, 22 insertions, 6 deletions
@@ -25,7 +25,8 @@ zmq::lb_t::lb_t () : active (0), - current (0) + current (0), + tbc (false) { } @@ -44,6 +45,8 @@ void zmq::lb_t::attach (writer_t *pipe_) void zmq::lb_t::detach (writer_t *pipe_) { + zmq_assert (!tbc || pipes [current] != pipe_); + // Remove the pipe from the list; adjust number of active pipes // accordingly. if (pipes.index (pipe_) < active) { @@ -64,9 +67,12 @@ void zmq::lb_t::revive (writer_t *pipe_) int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) { while (active > 0) { - if (pipes [current]->write (msg_)) + if (pipes [current]->write (msg_)) { + tbc = msg_->flags & ZMQ_MSG_TBC; break; + } + zmq_assert (!tbc); active--; if (current < active) pipes.swap (current, active); @@ -80,20 +86,27 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) return -1; } - pipes [current]->flush (); + // If it's final part of the message we can fluch it downstream and + // continue round-robinning (load balance). + if (!tbc) { + pipes [current]->flush (); + current = (current + 1) % active; + } // Detach the message from the data buffer. int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - // Move to the next pipe (load-balancing). - current = (current + 1) % active; - return 0; } bool zmq::lb_t::has_out () { + // If one part of the message was already written we can definitely + // write the rest of the message. + if (tbc) + return true; + while (active > 0) { if (pipes [current]->check_write ()) return true; @@ -53,6 +53,9 @@ namespace zmq // Points to the last pipe that the most recent message was sent to. pipes_t::size_type current; + // True if last we are in the middle of a multipart message. + bool tbc; + lb_t (const lb_t&); void operator = (const lb_t&); }; |