From 1619b3d84a04fe1886347fd83280a6070c9603f4 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 20 Mar 2011 20:52:54 +0100 Subject: Message atomicity bug in load-balancer fixed If the peer getting the message have disconnected in the middle of multiplart message, the remaining part of the message went to a different peer. This patch fixes the issue. Signed-off-by: Martin Sustrik --- src/lb.cpp | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) (limited to 'src/lb.cpp') diff --git a/src/lb.cpp b/src/lb.cpp index 647ff5a..95af4a1 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -29,6 +29,7 @@ zmq::lb_t::lb_t (own_t *sink_) : active (0), current (0), more (false), + dropping (false), sink (sink_), terminating (false) { @@ -65,9 +66,16 @@ void zmq::lb_t::terminate () void zmq::lb_t::terminated (writer_t *pipe_) { + pipes_t::size_type index = pipes.index (pipe_); + + // If we are in the middle of multipart message and current pipe + // have disconnected, we have to drop the remainder of the message. + if (index == current && more) + dropping = true; + // Remove the pipe from the list; adjust number of active pipes // accordingly. - if (pipes.index (pipe_) < active) { + if (index < active) { active--; if (current == active) current = 0; @@ -87,6 +95,21 @@ void zmq::lb_t::activated (writer_t *pipe_) int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) { + // Drop the message if required. If we are at the end of the message + // switch back to non-dropping mode. + if (dropping) { + + more = msg_->flags & ZMQ_MSG_MORE; + if (!more) + dropping = false; + + int rc = zmq_msg_close (msg_); + errno_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + while (active > 0) { if (pipes [current]->write (msg_)) { more = msg_->flags & ZMQ_MSG_MORE; -- cgit v1.2.3