diff options
| -rw-r--r-- | src/lb.cpp | 25 | ||||
| -rw-r--r-- | src/lb.hpp | 3 | 
2 files changed, 27 insertions, 1 deletions
@@ -29,6 +29,7 @@ zmq::lb_t::lb_t (own_t *sink_) :      active (0),      current (0),      more (false), +    dropping (false),      sink (sink_),      terminating (false)  { @@ -65,9 +66,16 @@ void zmq::lb_t::terminate ()  void zmq::lb_t::terminated (writer_t *pipe_)  { +    pipes_t::size_type index = pipes.index (pipe_); + +    //  If we are in the middle of multipart message and current pipe +    //  have disconnected, we have to drop the remainder of the message. +    if (index == current && more) +        dropping = true; +      //  Remove the pipe from the list; adjust number of active pipes      //  accordingly. -    if (pipes.index (pipe_) < active) { +    if (index < active) {          active--;          if (current == active)              current = 0; @@ -87,6 +95,21 @@ void zmq::lb_t::activated (writer_t *pipe_)  int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)  { +    //  Drop the message if required. If we are at the end of the message +    //  switch back to non-dropping mode. +    if (dropping) { + +        more = msg_->flags & ZMQ_MSG_MORE; +        if (!more) +            dropping = false; + +        int rc = zmq_msg_close (msg_); +        errno_assert (rc == 0); +        rc = zmq_msg_init (msg_); +        zmq_assert (rc == 0); +        return 0; +    } +      while (active > 0) {          if (pipes [current]->write (msg_)) {              more = msg_->flags & ZMQ_MSG_MORE; @@ -61,6 +61,9 @@ namespace zmq          //  True if last we are in the middle of a multipart message.          bool more; +        //  True if we are dropping current message. +        bool dropping; +          //  Object to send events to.          class own_t *sink;  | 
