summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-06-11 20:29:56 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-06-11 20:29:56 +0200
commitbd86def1c799a35d5cef0c0a9a1347a18fea227e (patch)
tree4d70d0053ef86b5a614bd19ad4c5f420017279c7
parent3935258b826adc31815be4f91b2f6eb02bb3c8ed (diff)
Actual message filtering happens in XPUB socket
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/dist.cpp47
-rw-r--r--src/dist.hpp20
-rw-r--r--src/mtrie.cpp21
-rw-r--r--src/mtrie.hpp11
-rw-r--r--src/xpub.cpp16
-rw-r--r--src/xpub.hpp3
-rw-r--r--src/xsub.cpp4
7 files changed, 99 insertions, 23 deletions
diff --git a/src/dist.cpp b/src/dist.cpp
index f7f0488..707b9c1 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -25,6 +25,7 @@
#include "likely.hpp"
zmq::dist_t::dist_t () :
+ matching (0),
active (0),
eligible (0),
more (false)
@@ -54,10 +55,27 @@ void zmq::dist_t::attach (pipe_t *pipe_)
}
}
+void zmq::dist_t::match (pipe_t *pipe_)
+{
+ // If pipe is already matching do nothing.
+ if (pipes.index (pipe_) < matching)
+ return;
+
+ // If the pipe isn't eligible, ignore it.
+ if (pipes.index (pipe_) >= eligible)
+ return;
+
+ // Mark the pipe as matching.
+ pipes.swap (pipes.index (pipe_), matching);
+ matching++;
+}
+
void zmq::dist_t::terminated (pipe_t *pipe_)
{
- // Remove the pipe from the list; adjust number of active and/or
+ // Remove the pipe from the list; adjust number of matching, active and/or
// eligible pipes accordingly.
+ if (pipes.index (pipe_) < matching)
+ matching--;
if (pipes.index (pipe_) < active)
active--;
if (pipes.index (pipe_) < eligible)
@@ -79,18 +97,27 @@ void zmq::dist_t::activated (pipe_t *pipe_)
}
}
-int zmq::dist_t::send (msg_t *msg_, int flags_)
+int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
+{
+ matching = active;
+ return send_to_matching (msg_, flags_);
+}
+
+int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
bool msg_more = msg_->flags () & msg_t::more;
- // Push the message to active pipes.
+ // Push the message to matching pipes.
distribute (msg_, flags_);
// If mutlipart message is fully sent, activate all the eligible pipes.
if (!msg_more)
active = eligible;
+ // Mark all the pipes as non-matching.
+ matching = 0;
+
more = msg_more;
return 0;
@@ -98,8 +125,8 @@ int zmq::dist_t::send (msg_t *msg_, int flags_)
void zmq::dist_t::distribute (msg_t *msg_, int flags_)
{
- // If there are no active pipes available, simply drop the message.
- if (active == 0) {
+ // If there are no matching pipes available, simply drop the message.
+ if (matching == 0) {
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
@@ -107,12 +134,12 @@ void zmq::dist_t::distribute (msg_t *msg_, int flags_)
return;
}
- // Add active-1 references to the message. We already hold one reference,
+ // Add matching-1 references to the message. We already hold one reference,
// that's why -1.
- msg_->add_refs (active - 1);
+ msg_->add_refs (matching - 1);
- // Push copy of the message to each active pipe.
- for (pipes_t::size_type i = 0; i < active;) {
+ // Push copy of the message to each matching pipe.
+ for (pipes_t::size_type i = 0; i < matching;) {
if (!write (pipes [i], msg_))
msg_->rm_refs (1);
else
@@ -133,6 +160,8 @@ bool zmq::dist_t::has_out ()
bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
{
if (!pipe_->write (msg_)) {
+ pipes.swap (pipes.index (pipe_), matching - 1);
+ matching--;
pipes.swap (pipes.index (pipe_), active - 1);
active--;
pipes.swap (active, eligible - 1);
diff --git a/src/dist.hpp b/src/dist.hpp
index 10613c1..005bb60 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -38,11 +38,26 @@ namespace zmq
dist_t ();
~dist_t ();
+ // Adds the pipe to the distributor object.
void attach (class pipe_t *pipe_);
+
+ // Activates pipe that have previously reached high watermark.
void activated (class pipe_t *pipe_);
+
+ // Mark the pipe as matching. Subsequent call to send_to_matching
+ // will send message also to this pipe.
+ void match (class pipe_t *pipe_);
+
+ // Removes the pipe from the distributor object.
void terminated (class pipe_t *pipe_);
- int send (class msg_t *msg_, int flags_);
+ // Send the message to all the outbound pipes. After the call all the
+ // pipes are marked as non-matching.
+ int send_to_matching (class msg_t *msg_, int flags_);
+
+ // Send the message to the matching outbound pipes.
+ int send_to_all (class msg_t *msg_, int flags_);
+
bool has_out ();
private:
@@ -58,6 +73,9 @@ namespace zmq
typedef array_t <class pipe_t, 2> pipes_t;
pipes_t pipes;
+ // Number of all the pipes to send the next message to.
+ pipes_t::size_type matching;
+
// Number of active pipes. All the active pipes are located at the
// beginning of the pipes array. These are the pipes the messages
// can be sent to at the moment.
diff --git a/src/mtrie.cpp b/src/mtrie.cpp
index 91f6852..fafac2d 100644
--- a/src/mtrie.cpp
+++ b/src/mtrie.cpp
@@ -206,10 +206,21 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
return next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);
}
-void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_)
+void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
+ void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
{
- // Merge the subscriptions from this node to the resultset.
- pipes_.insert (pipes.begin (), pipes.end ());
+ match_helper (data_, size_, func_, arg_);
+}
+
+void zmq::mtrie_t::match_helper (unsigned char *data_, size_t size_,
+ void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
+{
+ // TODO: This function is on critical path. Rewrite it as iteration
+ // rather than recursion.
+
+ // Signal the pipes attached to this node.
+ for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); ++it)
+ func_ (*it, arg_);
// If there are no subnodes in the trie, return.
if (count == 0)
@@ -217,14 +228,14 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_)
// If there's one subnode (optimisation).
if (count == 1) {
- next.node->match (data_ + 1, size_ - 1, pipes_);
+ next.node->match (data_ + 1, size_ - 1, func_, arg_);
return;
}
// If there are multiple subnodes.
for (unsigned char c = 0; c != count; c++) {
if (next.table [c])
- next.table [c]->match (data_ + 1, size_ - 1, pipes_);
+ next.table [c]->match (data_ + 1, size_ - 1, func_, arg_);
}
}
diff --git a/src/mtrie.hpp b/src/mtrie.hpp
index cd47029..68a3f2c 100644
--- a/src/mtrie.hpp
+++ b/src/mtrie.hpp
@@ -35,8 +35,6 @@ namespace zmq
{
public:
- typedef std::set <class pipe_t*> pipes_t;
-
mtrie_t ();
~mtrie_t ();
@@ -55,8 +53,9 @@ namespace zmq
// actually removed rather than de-duplicated.
bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_);
- // Get all matching pipes.
- void match (unsigned char *data_, size_t size_, pipes_t &pipes_);
+ // Signal all the matching pipes.
+ void match (unsigned char *data_, size_t size_,
+ void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_);
private:
@@ -68,8 +67,12 @@ namespace zmq
void *arg_);
bool rm_helper (unsigned char *prefix_, size_t size_,
class pipe_t *pipe_);
+ void match_helper (unsigned char *data_, size_t size_,
+ void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_);
+ typedef std::set <class pipe_t*> pipes_t;
pipes_t pipes;
+
unsigned char min;
unsigned short count;
union {
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 4b41696..9078de3 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -91,9 +91,21 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_)
dist.terminated (pipe_);
}
+void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
+{
+ xpub_t *self = (xpub_t*) arg_;
+ self->dist.match (pipe_);
+}
+
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
-{
- return dist.send (msg_, flags_);
+{
+ // Find the matching pipes.
+ subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
+ mark_as_matching, this);
+
+ // Send the message to all the pipes that were marked as matching
+ // in the previous step.
+ return dist.send_to_matching (msg_, flags_);
}
bool zmq::xpub_t::xhas_out ()
diff --git a/src/xpub.hpp b/src/xpub.hpp
index c5d64b5..740d1e2 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -57,6 +57,9 @@ namespace zmq
static void send_unsubscription (unsigned char *data_, size_t size_,
void *arg_);
+ // Function to be applied to each matching pipes.
+ static void mark_as_matching (class pipe_t *pipe_, void *arg_);
+
// List of all subscriptions mapped to corresponding pipes.
mtrie_t subscriptions;
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 729f6a4..a847d7f 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -87,13 +87,13 @@ int zmq::xsub_t::xsend (msg_t *msg_, int flags_)
// Process the subscription.
if (*data == 1) {
if (subscriptions.add (data + 1, size - 1))
- return dist.send (msg_, flags_);
+ return dist.send_to_all (msg_, flags_);
else
return 0;
}
else if (*data == 0) {
if (subscriptions.rm (data + 1, size - 1))
- return dist.send (msg_, flags_);
+ return dist.send_to_all (msg_, flags_);
else
return 0;
}