diff options
-rw-r--r-- | src/lb.cpp | 25 | ||||
-rw-r--r-- | src/lb.hpp | 3 |
2 files changed, 27 insertions, 1 deletions
@@ -29,6 +29,7 @@ zmq::lb_t::lb_t (own_t *sink_) : active (0), current (0), more (false), + dropping (false), sink (sink_), terminating (false) { @@ -65,9 +66,16 @@ void zmq::lb_t::terminate () void zmq::lb_t::terminated (writer_t *pipe_) { + pipes_t::size_type index = pipes.index (pipe_); + + // If we are in the middle of multipart message and current pipe + // have disconnected, we have to drop the remainder of the message. + if (index == current && more) + dropping = true; + // Remove the pipe from the list; adjust number of active pipes // accordingly. - if (pipes.index (pipe_) < active) { + if (index < active) { active--; if (current == active) current = 0; @@ -87,6 +95,21 @@ void zmq::lb_t::activated (writer_t *pipe_) int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) { + // Drop the message if required. If we are at the end of the message + // switch back to non-dropping mode. + if (dropping) { + + more = msg_->flags & ZMQ_MSG_MORE; + if (!more) + dropping = false; + + int rc = zmq_msg_close (msg_); + errno_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + while (active > 0) { if (pipes [current]->write (msg_)) { more = msg_->flags & ZMQ_MSG_MORE; @@ -61,6 +61,9 @@ namespace zmq // True if last we are in the middle of a multipart message. bool more; + // True if we are dropping current message. + bool dropping; + // Object to send events to. class own_t *sink; |