summaryrefslogtreecommitdiff
path: root/src/sub.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-03-27 09:24:38 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-03-27 09:24:38 +0100
commited291b02516ac5c9fe01f328d505305d36fe6319 (patch)
tree26c15f812a72bf06a05279e6629c91e7f3c41ebd /src/sub.cpp
parent0b9897b141ae03ccd00132a638d030a2521cf5b3 (diff)
multi-part messages work with PUB/SUB
Diffstat (limited to 'src/sub.cpp')
-rw-r--r--src/sub.cpp27
1 files changed, 25 insertions, 2 deletions
diff --git a/src/sub.cpp b/src/sub.cpp
index e32e198..fd3176f 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -26,7 +26,8 @@
zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
- has_message (false)
+ has_message (false),
+ tbc (false)
{
options.requires_in = true;
options.requires_out = false;
@@ -105,6 +106,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
if (has_message) {
zmq_msg_move (msg_, &message);
has_message = false;
+ tbc = msg_->flags & ZMQ_MSG_TBC;
return 0;
}
@@ -122,13 +124,27 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
// Check whether the message matches at least one subscription.
- if (match (msg_))
+ // Non-initial parts of the message are passed
+ if (tbc || match (msg_)) {
+ tbc = msg_->flags & ZMQ_MSG_TBC;
return 0;
+ }
+
+ // Message doesn't match. Pop any remaining parts of the message
+ // from the pipe.
+ while (msg_->flags & ZMQ_MSG_TBC) {
+ rc = fq.recv (msg_, ZMQ_NOBLOCK);
+ zmq_assert (rc == 0);
+ }
}
}
bool zmq::sub_t::xhas_in ()
{
+ // There are subsequent parts of the partly-read message available.
+ if (tbc)
+ return true;
+
// If there's already a message prepared by a previous call to zmq_poll,
// return straight ahead.
if (has_message)
@@ -153,6 +169,13 @@ bool zmq::sub_t::xhas_in ()
has_message = true;
return true;
}
+
+ // Message doesn't match. Pop any remaining parts of the message
+ // from the pipe.
+ while (message.flags & ZMQ_MSG_TBC) {
+ rc = fq.recv (&message, ZMQ_NOBLOCK);
+ zmq_assert (rc == 0);
+ }
}
}