diff options
| author | Martin Sustrik <sustrik@250bpm.com> | 2011-06-11 20:29:56 +0200 | 
|---|---|---|
| committer | Martin Sustrik <sustrik@250bpm.com> | 2011-06-11 20:29:56 +0200 | 
| commit | bd86def1c799a35d5cef0c0a9a1347a18fea227e (patch) | |
| tree | 4d70d0053ef86b5a614bd19ad4c5f420017279c7 /src | |
| parent | 3935258b826adc31815be4f91b2f6eb02bb3c8ed (diff) | |
Actual message filtering happens in XPUB socket
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
| -rw-r--r-- | src/dist.cpp | 47 | ||||
| -rw-r--r-- | src/dist.hpp | 20 | ||||
| -rw-r--r-- | src/mtrie.cpp | 21 | ||||
| -rw-r--r-- | src/mtrie.hpp | 11 | ||||
| -rw-r--r-- | src/xpub.cpp | 16 | ||||
| -rw-r--r-- | src/xpub.hpp | 3 | ||||
| -rw-r--r-- | src/xsub.cpp | 4 | 
7 files changed, 99 insertions, 23 deletions
| diff --git a/src/dist.cpp b/src/dist.cpp index f7f0488..707b9c1 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -25,6 +25,7 @@  #include "likely.hpp"  zmq::dist_t::dist_t () : +    matching (0),      active (0),      eligible (0),      more (false) @@ -54,10 +55,27 @@ void zmq::dist_t::attach (pipe_t *pipe_)      }  } +void zmq::dist_t::match (pipe_t *pipe_) +{ +    //  If pipe is already matching do nothing. +    if (pipes.index (pipe_) < matching) +        return; + +    //  If the pipe isn't eligible, ignore it. +    if (pipes.index (pipe_) >= eligible) +        return; + +    //  Mark the pipe as matching. +    pipes.swap (pipes.index (pipe_), matching); +    matching++;     +} +  void zmq::dist_t::terminated (pipe_t *pipe_)  { -    //  Remove the pipe from the list; adjust number of active and/or +    //  Remove the pipe from the list; adjust number of matching, active and/or      //  eligible pipes accordingly. +    if (pipes.index (pipe_) < matching) +        matching--;      if (pipes.index (pipe_) < active)          active--;      if (pipes.index (pipe_) < eligible) @@ -79,18 +97,27 @@ void zmq::dist_t::activated (pipe_t *pipe_)      }  } -int zmq::dist_t::send (msg_t *msg_, int flags_) +int zmq::dist_t::send_to_all (msg_t *msg_, int flags_) +{ +    matching = active; +    return send_to_matching (msg_, flags_); +} + +int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)  {      //  Is this end of a multipart message?      bool msg_more = msg_->flags () & msg_t::more; -    //  Push the message to active pipes. +    //  Push the message to matching pipes.      distribute (msg_, flags_);      //  If mutlipart message is fully sent, activate all the eligible pipes.      if (!msg_more)          active = eligible; +    //  Mark all the pipes as non-matching. +    matching = 0; +      more = msg_more;      return 0; @@ -98,8 +125,8 @@ int zmq::dist_t::send (msg_t *msg_, int flags_)  void zmq::dist_t::distribute (msg_t *msg_, int flags_)  { -    //  If there are no active pipes available, simply drop the message. -    if (active == 0) { +    //  If there are no matching pipes available, simply drop the message. +    if (matching == 0) {          int rc = msg_->close ();          errno_assert (rc == 0);          rc = msg_->init (); @@ -107,12 +134,12 @@ void zmq::dist_t::distribute (msg_t *msg_, int flags_)          return;      } -    //  Add active-1 references to the message. We already hold one reference, +    //  Add matching-1 references to the message. We already hold one reference,      //  that's why -1. -    msg_->add_refs (active - 1); +    msg_->add_refs (matching - 1); -    //  Push copy of the message to each active pipe. -    for (pipes_t::size_type i = 0; i < active;) { +    //  Push copy of the message to each matching pipe. +    for (pipes_t::size_type i = 0; i < matching;) {          if (!write (pipes [i], msg_))              msg_->rm_refs (1);          else @@ -133,6 +160,8 @@ bool zmq::dist_t::has_out ()  bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)  {      if (!pipe_->write (msg_)) { +        pipes.swap (pipes.index (pipe_), matching - 1); +        matching--;          pipes.swap (pipes.index (pipe_), active - 1);          active--;          pipes.swap (active, eligible - 1); diff --git a/src/dist.hpp b/src/dist.hpp index 10613c1..005bb60 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -38,11 +38,26 @@ namespace zmq          dist_t ();          ~dist_t (); +        //  Adds the pipe to the distributor object.          void attach (class pipe_t *pipe_); + +        //  Activates pipe that have previously reached high watermark.          void activated (class pipe_t *pipe_); + +        //  Mark the pipe as matching. Subsequent call to send_to_matching +        //  will send message also to this pipe. +        void match (class pipe_t *pipe_); + +        //  Removes the pipe from the distributor object.          void terminated (class pipe_t *pipe_); -        int send (class msg_t *msg_, int flags_); +        //  Send the message to all the outbound pipes. After the call all the +        //  pipes are marked as non-matching. +        int send_to_matching (class msg_t *msg_, int flags_); + +        //  Send the message to the matching outbound pipes. +        int send_to_all (class msg_t *msg_, int flags_); +          bool has_out ();      private: @@ -58,6 +73,9 @@ namespace zmq          typedef array_t <class pipe_t, 2> pipes_t;          pipes_t pipes; +        //  Number of all the pipes to send the next message to. +        pipes_t::size_type matching; +          //  Number of active pipes. All the active pipes are located at the          //  beginning of the pipes array. These are the pipes the messages          //  can be sent to at the moment. diff --git a/src/mtrie.cpp b/src/mtrie.cpp index 91f6852..fafac2d 100644 --- a/src/mtrie.cpp +++ b/src/mtrie.cpp @@ -206,10 +206,21 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,      return next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);  } -void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_) +void zmq::mtrie_t::match (unsigned char *data_, size_t size_, +    void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)  { -    //  Merge the subscriptions from this node to the resultset. -    pipes_.insert (pipes.begin (), pipes.end ()); +    match_helper (data_, size_, func_, arg_); +} + +void zmq::mtrie_t::match_helper (unsigned char *data_, size_t size_, +    void (*func_) (pipe_t *pipe_, void *arg_), void *arg_) +{ +    //  TODO: This function is on critical path. Rewrite it as iteration +    //  rather than recursion. + +    //  Signal the pipes attached to this node. +    for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); ++it) +        func_ (*it, arg_);      //  If there are no subnodes in the trie, return.      if (count == 0) @@ -217,14 +228,14 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_)      //  If there's one subnode (optimisation).      if (count == 1) { -        next.node->match (data_ + 1, size_ - 1, pipes_); +        next.node->match (data_ + 1, size_ - 1, func_, arg_);          return;      }      //  If there are multiple subnodes.      for (unsigned char c = 0; c != count; c++) {          if (next.table [c]) -            next.table [c]->match (data_ + 1, size_ - 1, pipes_); +            next.table [c]->match (data_ + 1, size_ - 1, func_, arg_);      }     } diff --git a/src/mtrie.hpp b/src/mtrie.hpp index cd47029..68a3f2c 100644 --- a/src/mtrie.hpp +++ b/src/mtrie.hpp @@ -35,8 +35,6 @@ namespace zmq      {      public: -        typedef std::set <class pipe_t*> pipes_t; -          mtrie_t ();          ~mtrie_t (); @@ -55,8 +53,9 @@ namespace zmq          //  actually removed rather than de-duplicated.          bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); -        //  Get all matching pipes. -        void match (unsigned char *data_, size_t size_, pipes_t &pipes_); +        //  Signal all the matching pipes. +        void match (unsigned char *data_, size_t size_, +            void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_);      private: @@ -68,8 +67,12 @@ namespace zmq              void *arg_);          bool rm_helper (unsigned char *prefix_, size_t size_,              class pipe_t *pipe_); +        void match_helper (unsigned char *data_, size_t size_, +            void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_); +        typedef std::set <class pipe_t*> pipes_t;          pipes_t pipes; +          unsigned char min;          unsigned short count;          union { diff --git a/src/xpub.cpp b/src/xpub.cpp index 4b41696..9078de3 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -91,9 +91,21 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_)      dist.terminated (pipe_);  } +void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) +{ +    xpub_t *self = (xpub_t*) arg_; +    self->dist.match (pipe_); +} +  int zmq::xpub_t::xsend (msg_t *msg_, int flags_) -{     -    return dist.send (msg_, flags_); +{ +    //  Find the matching pipes. +    subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), +        mark_as_matching, this); + +    //  Send the message to all the pipes that were marked as matching +    //  in the previous step. +    return dist.send_to_matching (msg_, flags_);  }  bool zmq::xpub_t::xhas_out () diff --git a/src/xpub.hpp b/src/xpub.hpp index c5d64b5..740d1e2 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -57,6 +57,9 @@ namespace zmq          static void send_unsubscription (unsigned char *data_, size_t size_,              void *arg_); +        //  Function to be applied to each matching pipes. +        static void mark_as_matching (class pipe_t *pipe_, void *arg_); +          //  List of all subscriptions mapped to corresponding pipes.          mtrie_t subscriptions; diff --git a/src/xsub.cpp b/src/xsub.cpp index 729f6a4..a847d7f 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -87,13 +87,13 @@ int zmq::xsub_t::xsend (msg_t *msg_, int flags_)      // Process the subscription.      if (*data == 1) {          if (subscriptions.add (data + 1, size - 1)) -            return dist.send (msg_, flags_); +            return dist.send_to_all (msg_, flags_);          else              return 0;      }      else if (*data == 0) {          if (subscriptions.rm (data + 1, size - 1)) -            return dist.send (msg_, flags_); +            return dist.send_to_all (msg_, flags_);          else              return 0;      } | 
