summaryrefslogtreecommitdiff
path: root/src/xpub.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-06-12 10:19:21 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-06-12 10:19:21 +0200
commite080e3e8b620b0e7ed02c28712a0c92b08de3451 (patch)
tree7e7aedfca8dd91e84d0825d251d122d3fe4ce7b3 /src/xpub.cpp
parentbd86def1c799a35d5cef0c0a9a1347a18fea227e (diff)
Publisher-side filtering for multi-part messages fixed
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xpub.cpp')
-rw-r--r--src/xpub.cpp25
1 files changed, 20 insertions, 5 deletions
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 9078de3..a102b68 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -26,7 +26,8 @@
#include "msg.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_)
+ socket_base_t (parent_, tid_),
+ more (false)
{
options.type = ZMQ_XPUB;
}
@@ -99,13 +100,27 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
- // Find the matching pipes.
- subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
- mark_as_matching, this);
+ bool msg_more = msg_->flags () & msg_t::more;
+
+ // For the first part of multi-part message, find the matching pipes.
+ if (!more)
+ subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
+ mark_as_matching, this);
// Send the message to all the pipes that were marked as matching
// in the previous step.
- return dist.send_to_matching (msg_, flags_);
+ int rc = dist.send_to_matching (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // If we are at the end of multi-part message we can mark all the pipes
+ // as non-matching.
+ if (!msg_more)
+ dist.unmatch ();
+
+ more = msg_more;
+
+ return 0;
}
bool zmq::xpub_t::xhas_out ()