From bd86def1c799a35d5cef0c0a9a1347a18fea227e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 11 Jun 2011 20:29:56 +0200 Subject: Actual message filtering happens in XPUB socket Signed-off-by: Martin Sustrik --- src/dist.cpp | 47 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 9 deletions(-) (limited to 'src/dist.cpp') diff --git a/src/dist.cpp b/src/dist.cpp index f7f0488..707b9c1 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -25,6 +25,7 @@ #include "likely.hpp" zmq::dist_t::dist_t () : + matching (0), active (0), eligible (0), more (false) @@ -54,10 +55,27 @@ void zmq::dist_t::attach (pipe_t *pipe_) } } +void zmq::dist_t::match (pipe_t *pipe_) +{ + // If pipe is already matching do nothing. + if (pipes.index (pipe_) < matching) + return; + + // If the pipe isn't eligible, ignore it. + if (pipes.index (pipe_) >= eligible) + return; + + // Mark the pipe as matching. + pipes.swap (pipes.index (pipe_), matching); + matching++; +} + void zmq::dist_t::terminated (pipe_t *pipe_) { - // Remove the pipe from the list; adjust number of active and/or + // Remove the pipe from the list; adjust number of matching, active and/or // eligible pipes accordingly. + if (pipes.index (pipe_) < matching) + matching--; if (pipes.index (pipe_) < active) active--; if (pipes.index (pipe_) < eligible) @@ -79,18 +97,27 @@ void zmq::dist_t::activated (pipe_t *pipe_) } } -int zmq::dist_t::send (msg_t *msg_, int flags_) +int zmq::dist_t::send_to_all (msg_t *msg_, int flags_) +{ + matching = active; + return send_to_matching (msg_, flags_); +} + +int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_) { // Is this end of a multipart message? bool msg_more = msg_->flags () & msg_t::more; - // Push the message to active pipes. + // Push the message to matching pipes. distribute (msg_, flags_); // If mutlipart message is fully sent, activate all the eligible pipes. if (!msg_more) active = eligible; + // Mark all the pipes as non-matching. + matching = 0; + more = msg_more; return 0; @@ -98,8 +125,8 @@ int zmq::dist_t::send (msg_t *msg_, int flags_) void zmq::dist_t::distribute (msg_t *msg_, int flags_) { - // If there are no active pipes available, simply drop the message. - if (active == 0) { + // If there are no matching pipes available, simply drop the message. + if (matching == 0) { int rc = msg_->close (); errno_assert (rc == 0); rc = msg_->init (); @@ -107,12 +134,12 @@ void zmq::dist_t::distribute (msg_t *msg_, int flags_) return; } - // Add active-1 references to the message. We already hold one reference, + // Add matching-1 references to the message. We already hold one reference, // that's why -1. - msg_->add_refs (active - 1); + msg_->add_refs (matching - 1); - // Push copy of the message to each active pipe. - for (pipes_t::size_type i = 0; i < active;) { + // Push copy of the message to each matching pipe. + for (pipes_t::size_type i = 0; i < matching;) { if (!write (pipes [i], msg_)) msg_->rm_refs (1); else @@ -133,6 +160,8 @@ bool zmq::dist_t::has_out () bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { + pipes.swap (pipes.index (pipe_), matching - 1); + matching--; pipes.swap (pipes.index (pipe_), active - 1); active--; pipes.swap (active, eligible - 1); -- cgit v1.2.3