From ed291b02516ac5c9fe01f328d505305d36fe6319 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 27 Mar 2010 09:24:38 +0100 Subject: multi-part messages work with PUB/SUB --- src/fq.cpp | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) (limited to 'src/fq.cpp') diff --git a/src/fq.cpp b/src/fq.cpp index a7e93ce..3ab1d32 100644 --- a/src/fq.cpp +++ b/src/fq.cpp @@ -25,7 +25,8 @@ zmq::fq_t::fq_t () : active (0), - current (0) + current (0), + tbc (false) { } @@ -44,6 +45,8 @@ void zmq::fq_t::attach (reader_t *pipe_) void zmq::fq_t::detach (reader_t *pipe_) { + zmq_assert (!tbc || pipes [current] != pipe_); + // Remove the pipe from the list; adjust number of active pipes // accordingly. if (pipes.index (pipe_) < active) { @@ -75,14 +78,26 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) // Deallocate old content of the message. zmq_msg_close (msg_); - // Round-robin over the pipes to get next message. + // Round-robin over the pipes to get the next message. for (int count = active; count != 0; count--) { + + // Try to fetch new message. If we've already read part of the message + // subsequent part should be immediately available. bool fetched = pipes [current]->read (msg_); - current++; - if (current >= active) - current = 0; - if (fetched) + zmq_assert (!(tbc && !fetched)); + + // Note that when message is not fetched, current pipe is killed and + // replaced by another active pipe. Thus we don't have to increase + // the 'current' pointer. + if (fetched) { + tbc = msg_->flags & ZMQ_MSG_TBC; + if (!tbc) { + current++; + if (current >= active) + current = 0; + } return 0; + } } // No message is available. Initialise the output parameter @@ -94,6 +109,10 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) bool zmq::fq_t::has_in () { + // There are subsequent parts of the partly-read message available. + if (tbc) + return true; + // Note that messing with current doesn't break the fairness of fair // queueing algorithm. If there are no messages available current will // get back to its original value. Otherwise it'll point to the first -- cgit v1.2.3