summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/dist.cpp8
-rw-r--r--src/dist.hpp8
-rw-r--r--src/xpub.cpp25
-rw-r--r--src/xpub.hpp3
4 files changed, 33 insertions, 11 deletions
diff --git a/src/dist.cpp b/src/dist.cpp
index 707b9c1..15bd168 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -70,6 +70,11 @@ void zmq::dist_t::match (pipe_t *pipe_)
matching++;
}
+void zmq::dist_t::unmatch ()
+{
+ matching = 0;
+}
+
void zmq::dist_t::terminated (pipe_t *pipe_)
{
// Remove the pipe from the list; adjust number of matching, active and/or
@@ -115,9 +120,6 @@ int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
if (!msg_more)
active = eligible;
- // Mark all the pipes as non-matching.
- matching = 0;
-
more = msg_more;
return 0;
diff --git a/src/dist.hpp b/src/dist.hpp
index 005bb60..c8d121c 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -48,14 +48,16 @@ namespace zmq
// will send message also to this pipe.
void match (class pipe_t *pipe_);
+ // Mark all pipes as non-matching.
+ void unmatch ();
+
// Removes the pipe from the distributor object.
void terminated (class pipe_t *pipe_);
- // Send the message to all the outbound pipes. After the call all the
- // pipes are marked as non-matching.
+ // Send the message to the matching outbound pipes.
int send_to_matching (class msg_t *msg_, int flags_);
- // Send the message to the matching outbound pipes.
+ // Send the message to all the outbound pipes.
int send_to_all (class msg_t *msg_, int flags_);
bool has_out ();
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 9078de3..a102b68 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -26,7 +26,8 @@
#include "msg.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_)
+ socket_base_t (parent_, tid_),
+ more (false)
{
options.type = ZMQ_XPUB;
}
@@ -99,13 +100,27 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
- // Find the matching pipes.
- subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
- mark_as_matching, this);
+ bool msg_more = msg_->flags () & msg_t::more;
+
+ // For the first part of multi-part message, find the matching pipes.
+ if (!more)
+ subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
+ mark_as_matching, this);
// Send the message to all the pipes that were marked as matching
// in the previous step.
- return dist.send_to_matching (msg_, flags_);
+ int rc = dist.send_to_matching (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // If we are at the end of multi-part message we can mark all the pipes
+ // as non-matching.
+ if (!msg_more)
+ dist.unmatch ();
+
+ more = msg_more;
+
+ return 0;
}
bool zmq::xpub_t::xhas_out ()
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 740d1e2..a2e7335 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -66,6 +66,9 @@ namespace zmq
// Distributor of messages holding the list of outbound pipes.
dist_t dist;
+ // True if we are in the middle of sending a multi-part message.
+ bool more;
+
// List of pending (un)subscriptions, ie. those that were already
// applied to the trie, but not yet received by the user.
typedef std::deque <blob_t> pending_t;