diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-06-12 10:19:21 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-06-12 10:19:21 +0200 |
commit | e080e3e8b620b0e7ed02c28712a0c92b08de3451 (patch) | |
tree | 7e7aedfca8dd91e84d0825d251d122d3fe4ce7b3 /src/xpub.cpp | |
parent | bd86def1c799a35d5cef0c0a9a1347a18fea227e (diff) |
Publisher-side filtering for multi-part messages fixed
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xpub.cpp')
-rw-r--r-- | src/xpub.cpp | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/src/xpub.cpp b/src/xpub.cpp index 9078de3..a102b68 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -26,7 +26,8 @@ #include "msg.hpp" zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_) + socket_base_t (parent_, tid_), + more (false) { options.type = ZMQ_XPUB; } @@ -99,13 +100,27 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) int zmq::xpub_t::xsend (msg_t *msg_, int flags_) { - // Find the matching pipes. - subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), - mark_as_matching, this); + bool msg_more = msg_->flags () & msg_t::more; + + // For the first part of multi-part message, find the matching pipes. + if (!more) + subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), + mark_as_matching, this); // Send the message to all the pipes that were marked as matching // in the previous step. - return dist.send_to_matching (msg_, flags_); + int rc = dist.send_to_matching (msg_, flags_); + if (rc != 0) + return rc; + + // If we are at the end of multi-part message we can mark all the pipes + // as non-matching. + if (!msg_more) + dist.unmatch (); + + more = msg_more; + + return 0; } bool zmq::xpub_t::xhas_out () |