summaryrefslogtreecommitdiff
path: root/src/lb.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-03-20 20:52:54 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-03-20 20:52:54 +0100
commit1619b3d84a04fe1886347fd83280a6070c9603f4 (patch)
tree817536a0e500227dcf1463f68d88375cd75dd9ec /src/lb.cpp
parent92c7c18367f91c6341fc617026f5e25000466b05 (diff)
Message atomicity bug in load-balancer fixed
If the peer getting the message have disconnected in the middle of multiplart message, the remaining part of the message went to a different peer. This patch fixes the issue. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/lb.cpp')
-rw-r--r--src/lb.cpp25
1 files changed, 24 insertions, 1 deletions
diff --git a/src/lb.cpp b/src/lb.cpp
index 647ff5a..95af4a1 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -29,6 +29,7 @@ zmq::lb_t::lb_t (own_t *sink_) :
active (0),
current (0),
more (false),
+ dropping (false),
sink (sink_),
terminating (false)
{
@@ -65,9 +66,16 @@ void zmq::lb_t::terminate ()
void zmq::lb_t::terminated (writer_t *pipe_)
{
+ pipes_t::size_type index = pipes.index (pipe_);
+
+ // If we are in the middle of multipart message and current pipe
+ // have disconnected, we have to drop the remainder of the message.
+ if (index == current && more)
+ dropping = true;
+
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
- if (pipes.index (pipe_) < active) {
+ if (index < active) {
active--;
if (current == active)
current = 0;
@@ -87,6 +95,21 @@ void zmq::lb_t::activated (writer_t *pipe_)
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{
+ // Drop the message if required. If we are at the end of the message
+ // switch back to non-dropping mode.
+ if (dropping) {
+
+ more = msg_->flags & ZMQ_MSG_MORE;
+ if (!more)
+ dropping = false;
+
+ int rc = zmq_msg_close (msg_);
+ errno_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
while (active > 0) {
if (pipes [current]->write (msg_)) {
more = msg_->flags & ZMQ_MSG_MORE;