summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lb.cpp25
-rw-r--r--src/lb.hpp3
2 files changed, 22 insertions, 6 deletions
diff --git a/src/lb.cpp b/src/lb.cpp
index e282d24..b497b11 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -25,7 +25,8 @@
zmq::lb_t::lb_t () :
active (0),
- current (0)
+ current (0),
+ tbc (false)
{
}
@@ -44,6 +45,8 @@ void zmq::lb_t::attach (writer_t *pipe_)
void zmq::lb_t::detach (writer_t *pipe_)
{
+ zmq_assert (!tbc || pipes [current] != pipe_);
+
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
if (pipes.index (pipe_) < active) {
@@ -64,9 +67,12 @@ void zmq::lb_t::revive (writer_t *pipe_)
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{
while (active > 0) {
- if (pipes [current]->write (msg_))
+ if (pipes [current]->write (msg_)) {
+ tbc = msg_->flags & ZMQ_MSG_TBC;
break;
+ }
+ zmq_assert (!tbc);
active--;
if (current < active)
pipes.swap (current, active);
@@ -80,20 +86,27 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
return -1;
}
- pipes [current]->flush ();
+ // If it's final part of the message we can fluch it downstream and
+ // continue round-robinning (load balance).
+ if (!tbc) {
+ pipes [current]->flush ();
+ current = (current + 1) % active;
+ }
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
- // Move to the next pipe (load-balancing).
- current = (current + 1) % active;
-
return 0;
}
bool zmq::lb_t::has_out ()
{
+ // If one part of the message was already written we can definitely
+ // write the rest of the message.
+ if (tbc)
+ return true;
+
while (active > 0) {
if (pipes [current]->check_write ())
return true;
diff --git a/src/lb.hpp b/src/lb.hpp
index 5bddc1e..79f83c5 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -53,6 +53,9 @@ namespace zmq
// Points to the last pipe that the most recent message was sent to.
pipes_t::size_type current;
+ // True if last we are in the middle of a multipart message.
+ bool tbc;
+
lb_t (const lb_t&);
void operator = (const lb_t&);
};