From e080e3e8b620b0e7ed02c28712a0c92b08de3451 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 12 Jun 2011 10:19:21 +0200 Subject: Publisher-side filtering for multi-part messages fixed Signed-off-by: Martin Sustrik --- src/dist.cpp | 8 +++++--- src/dist.hpp | 8 +++++--- src/xpub.cpp | 25 ++++++++++++++++++++----- src/xpub.hpp | 3 +++ 4 files changed, 33 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/dist.cpp b/src/dist.cpp index 707b9c1..15bd168 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -70,6 +70,11 @@ void zmq::dist_t::match (pipe_t *pipe_) matching++; } +void zmq::dist_t::unmatch () +{ + matching = 0; +} + void zmq::dist_t::terminated (pipe_t *pipe_) { // Remove the pipe from the list; adjust number of matching, active and/or @@ -115,9 +120,6 @@ int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_) if (!msg_more) active = eligible; - // Mark all the pipes as non-matching. - matching = 0; - more = msg_more; return 0; diff --git a/src/dist.hpp b/src/dist.hpp index 005bb60..c8d121c 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -48,14 +48,16 @@ namespace zmq // will send message also to this pipe. void match (class pipe_t *pipe_); + // Mark all pipes as non-matching. + void unmatch (); + // Removes the pipe from the distributor object. void terminated (class pipe_t *pipe_); - // Send the message to all the outbound pipes. After the call all the - // pipes are marked as non-matching. + // Send the message to the matching outbound pipes. int send_to_matching (class msg_t *msg_, int flags_); - // Send the message to the matching outbound pipes. + // Send the message to all the outbound pipes. int send_to_all (class msg_t *msg_, int flags_); bool has_out (); diff --git a/src/xpub.cpp b/src/xpub.cpp index 9078de3..a102b68 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -26,7 +26,8 @@ #include "msg.hpp" zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_) + socket_base_t (parent_, tid_), + more (false) { options.type = ZMQ_XPUB; } @@ -99,13 +100,27 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) int zmq::xpub_t::xsend (msg_t *msg_, int flags_) { - // Find the matching pipes. - subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), - mark_as_matching, this); + bool msg_more = msg_->flags () & msg_t::more; + + // For the first part of multi-part message, find the matching pipes. + if (!more) + subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), + mark_as_matching, this); // Send the message to all the pipes that were marked as matching // in the previous step. - return dist.send_to_matching (msg_, flags_); + int rc = dist.send_to_matching (msg_, flags_); + if (rc != 0) + return rc; + + // If we are at the end of multi-part message we can mark all the pipes + // as non-matching. + if (!msg_more) + dist.unmatch (); + + more = msg_more; + + return 0; } bool zmq::xpub_t::xhas_out () diff --git a/src/xpub.hpp b/src/xpub.hpp index 740d1e2..a2e7335 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -66,6 +66,9 @@ namespace zmq // Distributor of messages holding the list of outbound pipes. dist_t dist; + // True if we are in the middle of sending a multi-part message. + bool more; + // List of pending (un)subscriptions, ie. those that were already // applied to the trie, but not yet received by the user. typedef std::deque pending_t; -- cgit v1.2.3