From ed291b02516ac5c9fe01f328d505305d36fe6319 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 27 Mar 2010 09:24:38 +0100 Subject: multi-part messages work with PUB/SUB --- src/sub.cpp | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) (limited to 'src/sub.cpp') diff --git a/src/sub.cpp b/src/sub.cpp index e32e198..fd3176f 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -26,7 +26,8 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) : socket_base_t (parent_), - has_message (false) + has_message (false), + tbc (false) { options.requires_in = true; options.requires_out = false; @@ -105,6 +106,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) if (has_message) { zmq_msg_move (msg_, &message); has_message = false; + tbc = msg_->flags & ZMQ_MSG_TBC; return 0; } @@ -122,13 +124,27 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) return -1; // Check whether the message matches at least one subscription. - if (match (msg_)) + // Non-initial parts of the message are passed + if (tbc || match (msg_)) { + tbc = msg_->flags & ZMQ_MSG_TBC; return 0; + } + + // Message doesn't match. Pop any remaining parts of the message + // from the pipe. + while (msg_->flags & ZMQ_MSG_TBC) { + rc = fq.recv (msg_, ZMQ_NOBLOCK); + zmq_assert (rc == 0); + } } } bool zmq::sub_t::xhas_in () { + // There are subsequent parts of the partly-read message available. + if (tbc) + return true; + // If there's already a message prepared by a previous call to zmq_poll, // return straight ahead. if (has_message) @@ -153,6 +169,13 @@ bool zmq::sub_t::xhas_in () has_message = true; return true; } + + // Message doesn't match. Pop any remaining parts of the message + // from the pipe. + while (message.flags & ZMQ_MSG_TBC) { + rc = fq.recv (&message, ZMQ_NOBLOCK); + zmq_assert (rc == 0); + } } } -- cgit v1.2.3