diff options
| -rw-r--r-- | src/sub.cpp | 111 | ||||
| -rw-r--r-- | src/sub.hpp | 19 | ||||
| -rw-r--r-- | src/xsub.cpp | 107 | ||||
| -rw-r--r-- | src/xsub.hpp | 17 | 
4 files changed, 134 insertions, 120 deletions
diff --git a/src/sub.cpp b/src/sub.cpp index 2d6ade6..11c4532 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -22,19 +22,39 @@  #include "msg.hpp"  zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : -    xsub_t (parent_, tid_) +    xsub_t (parent_, tid_), +    has_message (false), +    more (false)  {      options.type = ZMQ_SUB; +    int rc = message.init (); +    errno_assert (rc == 0);  }  zmq::sub_t::~sub_t ()  { +    int rc = message.close (); +    errno_assert (rc == 0);  }  int zmq::sub_t::xsetsockopt (int option_, const void *optval_,      size_t optvallen_)  { -    if (option_ != ZMQ_SUBSCRIBE && option_ != ZMQ_UNSUBSCRIBE) { +    //  Process a subscription. +    if (option_ == ZMQ_SUBSCRIBE) +        subscriptions.add ((unsigned char*) optval_, optvallen_); + +    //  Process an unsubscription. Return error if there is no corresponding +    //  subscription. +    else if (option_ == ZMQ_UNSUBSCRIBE) { +        if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) { +            errno = EINVAL; +            return -1; +        } +    } + +    //  Unknow option. +    else {          errno = EINVAL;          return -1;      } @@ -74,3 +94,90 @@ bool zmq::sub_t::xhas_out ()      //  Overload the XSUB's send.      return false;  } + +int zmq::sub_t::xrecv (msg_t *msg_, int flags_) +{ +    //  If there's already a message prepared by a previous call to zmq_poll, +    //  return it straight ahead. +    if (has_message) { +        int rc = msg_->move (message); +        errno_assert (rc == 0); +        has_message = false; +        more = msg_->flags () & msg_t::more; +        return 0; +    } + +    //  TODO: This can result in infinite loop in the case of continuous +    //  stream of non-matching messages which breaks the non-blocking recv +    //  semantics. +    while (true) { + +        //  Get a message using fair queueing algorithm. +        int rc = xsub_t::xrecv (msg_, flags_); + +        //  If there's no message available, return immediately. +        //  The same when error occurs. +        if (rc != 0) +            return -1; + +        //  Check whether the message matches at least one subscription. +        //  Non-initial parts of the message are passed  +        if (more || match (msg_)) { +            more = msg_->flags () & msg_t::more; +            return 0; +        } + +        //  Message doesn't match. Pop any remaining parts of the message +        //  from the pipe. +        while (msg_->flags () & msg_t::more) { +            rc = xsub_t::xrecv (msg_, ZMQ_DONTWAIT); +            zmq_assert (rc == 0); +        } +    } +} + +bool zmq::sub_t::xhas_in () +{ +    //  There are subsequent parts of the partly-read message available. +    if (more) +        return true; + +    //  If there's already a message prepared by a previous call to zmq_poll, +    //  return straight ahead. +    if (has_message) +        return true; + +    //  TODO: This can result in infinite loop in the case of continuous +    //  stream of non-matching messages. +    while (true) { + +        //  Get a message using fair queueing algorithm. +        int rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT); + +        //  If there's no message available, return immediately. +        //  The same when error occurs. +        if (rc != 0) { +            zmq_assert (errno == EAGAIN); +            return false; +        } + +        //  Check whether the message matches at least one subscription. +        if (match (&message)) { +            has_message = true; +            return true; +        } + +        //  Message doesn't match. Pop any remaining parts of the message +        //  from the pipe. +        while (message.flags () & msg_t::more) { +            rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT); +            zmq_assert (rc == 0); +        } +    } +} + +bool zmq::sub_t::match (msg_t *msg_) +{ +    return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); +} + diff --git a/src/sub.hpp b/src/sub.hpp index 8ba8987..91a5b65 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -22,6 +22,8 @@  #define __ZMQ_SUB_HPP_INCLUDED__  #include "xsub.hpp" +#include "trie.hpp" +#include "msg.hpp"  namespace zmq  { @@ -38,9 +40,26 @@ namespace zmq          int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          int xsend (class msg_t *msg_, int options_);          bool xhas_out (); +        int xrecv (class msg_t *msg_, int flags_); +        bool xhas_in ();      private: +        //  Check whether the message matches at least one subscription. +        bool match (class msg_t *msg_); + +        //  The repository of subscriptions. +        trie_t subscriptions; + +        //  If true, 'message' contains a matching message to return on the +        //  next recv call. +        bool has_message; +        msg_t message; + +        //  If true, part of a multipart message was already received, but +        //  there are following parts still waiting. +        bool more; +          sub_t (const sub_t&);          const sub_t &operator = (const sub_t&);      }; diff --git a/src/xsub.cpp b/src/xsub.cpp index c5f610f..6928d82 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -24,19 +24,13 @@  #include "err.hpp"  zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : -    socket_base_t (parent_, tid_), -    has_message (false), -    more (false) +    socket_base_t (parent_, tid_)  {      options.type = ZMQ_XSUB; -    int rc = message.init (); -    errno_assert (rc == 0);  }  zmq::xsub_t::~xsub_t ()  { -    int rc = message.close (); -    errno_assert (rc == 0);  }  void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) @@ -57,21 +51,8 @@ void zmq::xsub_t::xterminated (pipe_t *pipe_)  int zmq::xsub_t::xsend (msg_t *msg_, int options_)  { -    size_t size = msg_->size (); -    unsigned char *data = (unsigned char*) msg_->data (); - -    //  Malformed subscriptions are dropped silently. -    if (size >= 1) { - -        //  Process a subscription. -        if (*data == 1) -            subscriptions.add (data + 1, size - 1); - -        //  Process an unsubscription. Invalid unsubscription is ignored. -        if (*data == 0) -            subscriptions.rm (data + 1, size - 1); -    } - +    //  TODO: Once we'll send the subscription upstream here. For now +    //  just empty the message.      int rc = msg_->close ();      errno_assert (rc == 0);      rc = msg_->init (); @@ -85,89 +66,13 @@ bool zmq::xsub_t::xhas_out ()      return true;  } -int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) +int zmq::xsub_t::xrecv (class msg_t *msg_, int flags_)  { -    //  If there's already a message prepared by a previous call to zmq_poll, -    //  return it straight ahead. -    if (has_message) { -        int rc = msg_->move (message); -        errno_assert (rc == 0); -        has_message = false; -        more = msg_->flags () & msg_t::more; -        return 0; -    } - -    //  TODO: This can result in infinite loop in the case of continuous -    //  stream of non-matching messages which breaks the non-blocking recv -    //  semantics. -    while (true) { - -        //  Get a message using fair queueing algorithm. -        int rc = fq.recv (msg_, flags_); - -        //  If there's no message available, return immediately. -        //  The same when error occurs. -        if (rc != 0) -            return -1; - -        //  Check whether the message matches at least one subscription. -        //  Non-initial parts of the message are passed  -        if (more || match (msg_)) { -            more = msg_->flags () & msg_t::more; -            return 0; -        } - -        //  Message doesn't match. Pop any remaining parts of the message -        //  from the pipe. -        while (msg_->flags () & msg_t::more) { -            rc = fq.recv (msg_, ZMQ_DONTWAIT); -            zmq_assert (rc == 0); -        } -    } +    return fq.recv (msg_, flags_);  }  bool zmq::xsub_t::xhas_in ()  { -    //  There are subsequent parts of the partly-read message available. -    if (more) -        return true; - -    //  If there's already a message prepared by a previous call to zmq_poll, -    //  return straight ahead. -    if (has_message) -        return true; - -    //  TODO: This can result in infinite loop in the case of continuous -    //  stream of non-matching messages. -    while (true) { - -        //  Get a message using fair queueing algorithm. -        int rc = fq.recv (&message, ZMQ_DONTWAIT); - -        //  If there's no message available, return immediately. -        //  The same when error occurs. -        if (rc != 0) { -            zmq_assert (errno == EAGAIN); -            return false; -        } - -        //  Check whether the message matches at least one subscription. -        if (match (&message)) { -            has_message = true; -            return true; -        } - -        //  Message doesn't match. Pop any remaining parts of the message -        //  from the pipe. -        while (message.flags () & msg_t::more) { -            rc = fq.recv (&message, ZMQ_DONTWAIT); -            zmq_assert (rc == 0); -        } -    } -} - -bool zmq::xsub_t::match (msg_t *msg_) -{ -    return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); +    return fq.has_in ();  } diff --git a/src/xsub.hpp b/src/xsub.hpp index 58ddae5..ebd6259 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -21,9 +21,7 @@  #ifndef __ZMQ_XSUB_HPP_INCLUDED__  #define __ZMQ_XSUB_HPP_INCLUDED__ -#include "trie.hpp"  #include "socket_base.hpp" -#include "msg.hpp"  #include "fq.hpp"  namespace zmq @@ -50,24 +48,9 @@ namespace zmq      private: -        //  Check whether the message matches at least one subscription. -        bool match (class msg_t *msg_); -          //  Fair queueing object for inbound pipes.          fq_t fq; -        //  The repository of subscriptions. -        trie_t subscriptions; - -        //  If true, 'message' contains a matching message to return on the -        //  next recv call. -        bool has_message; -        msg_t message; - -        //  If true, part of a multipart message was already received, but -        //  there are following parts still waiting. -        bool more; -          xsub_t (const xsub_t&);          const xsub_t &operator = (const xsub_t&);      };  | 
