diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-03-27 09:43:49 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-03-27 09:43:49 +0100 |
commit | bbfac783f91f6692b7f9c0aa5392ac955f7b49bf (patch) | |
tree | 40b9b7cff4e755770da71944af7dea200281f11c /src | |
parent | ed291b02516ac5c9fe01f328d505305d36fe6319 (diff) |
multi-part message work with UPSTREAM/DOWNSTREAM
Diffstat (limited to 'src')
-rw-r--r-- | src/lb.cpp | 25 | ||||
-rw-r--r-- | src/lb.hpp | 3 |
2 files changed, 22 insertions, 6 deletions
@@ -25,7 +25,8 @@ zmq::lb_t::lb_t () : active (0), - current (0) + current (0), + tbc (false) { } @@ -44,6 +45,8 @@ void zmq::lb_t::attach (writer_t *pipe_) void zmq::lb_t::detach (writer_t *pipe_) { + zmq_assert (!tbc || pipes [current] != pipe_); + // Remove the pipe from the list; adjust number of active pipes // accordingly. if (pipes.index (pipe_) < active) { @@ -64,9 +67,12 @@ void zmq::lb_t::revive (writer_t *pipe_) int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) { while (active > 0) { - if (pipes [current]->write (msg_)) + if (pipes [current]->write (msg_)) { + tbc = msg_->flags & ZMQ_MSG_TBC; break; + } + zmq_assert (!tbc); active--; if (current < active) pipes.swap (current, active); @@ -80,20 +86,27 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) return -1; } - pipes [current]->flush (); + // If it's final part of the message we can fluch it downstream and + // continue round-robinning (load balance). + if (!tbc) { + pipes [current]->flush (); + current = (current + 1) % active; + } // Detach the message from the data buffer. int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); - // Move to the next pipe (load-balancing). - current = (current + 1) % active; - return 0; } bool zmq::lb_t::has_out () { + // If one part of the message was already written we can definitely + // write the rest of the message. + if (tbc) + return true; + while (active > 0) { if (pipes [current]->check_write ()) return true; @@ -53,6 +53,9 @@ namespace zmq // Points to the last pipe that the most recent message was sent to. pipes_t::size_type current; + // True if last we are in the middle of a multipart message. + bool tbc; + lb_t (const lb_t&); void operator = (const lb_t&); }; |