diff options
| -rw-r--r-- | src/lb.cpp | 25 | ||||
| -rw-r--r-- | src/lb.hpp | 3 | 
2 files changed, 22 insertions, 6 deletions
@@ -25,7 +25,8 @@  zmq::lb_t::lb_t () :      active (0), -    current (0) +    current (0), +    tbc (false)  {  } @@ -44,6 +45,8 @@ void zmq::lb_t::attach (writer_t *pipe_)  void zmq::lb_t::detach (writer_t *pipe_)  { +    zmq_assert (!tbc || pipes [current] != pipe_); +      //  Remove the pipe from the list; adjust number of active pipes      //  accordingly.      if (pipes.index (pipe_) < active) { @@ -64,9 +67,12 @@ void zmq::lb_t::revive (writer_t *pipe_)  int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)  {      while (active > 0) { -        if (pipes [current]->write (msg_)) +        if (pipes [current]->write (msg_)) { +            tbc = msg_->flags & ZMQ_MSG_TBC;              break; +        } +        zmq_assert (!tbc);          active--;          if (current < active)              pipes.swap (current, active); @@ -80,20 +86,27 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)          return -1;      } -    pipes [current]->flush (); +    //  If it's final part of the message we can fluch it downstream and +    //  continue round-robinning (load balance). +    if (!tbc) { +        pipes [current]->flush (); +        current = (current + 1) % active; +    }      //  Detach the message from the data buffer.      int rc = zmq_msg_init (msg_);      zmq_assert (rc == 0); -    //  Move to the next pipe (load-balancing). -    current = (current + 1) % active; -      return 0;  }  bool zmq::lb_t::has_out ()  { +    //  If one part of the message was already written we can definitely +    //  write the rest of the message. +    if (tbc) +        return true; +      while (active > 0) {          if (pipes [current]->check_write ())              return true; @@ -53,6 +53,9 @@ namespace zmq          //  Points to the last pipe that the most recent message was sent to.          pipes_t::size_type current; +        //  True if last we are in the middle of a multipart message. +        bool tbc; +          lb_t (const lb_t&);          void operator = (const lb_t&);      };  | 
