diff options
Diffstat (limited to 'src/sub.cpp')
| -rw-r--r-- | src/sub.cpp | 61 | 
1 files changed, 47 insertions, 14 deletions
| diff --git a/src/sub.cpp b/src/sub.cpp index 954eb87..8c1ef9b 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -23,7 +23,8 @@  #include "err.hpp"  zmq::sub_t::sub_t (class app_thread_t *parent_) : -    socket_base_t (parent_) +    socket_base_t (parent_), +    all_count (0)  {  } @@ -36,18 +37,41 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_,  {      if (option_ == ZMQ_SUBSCRIBE) {          std::string subscription ((const char*) optval_, optvallen_); -        subscriptions.insert (subscription); +        if (subscription == "*") +            all_count++; +        else if (subscription [subscription.size () - 1] == '*') +            prefixes.insert (subscription.substr (0, subscription.size () - 1)); +        else +            topics.insert (subscription);          return 0;      }      if (option_ == ZMQ_UNSUBSCRIBE) {          std::string subscription ((const char*) optval_, optvallen_); -        subscriptions_t::iterator it = subscriptions.find (subscription); -        if (it == subscriptions.end ()) { -            errno = EINVAL; -            return -1; +        if (subscription == "*") { +            if (!all_count) { +                errno = EINVAL; +                return -1; +            } +            all_count--; +        } +        else if (subscription [subscription.size () - 1] == '*') { +            subscriptions_t::iterator it = prefixes.find ( +                subscription.substr (0, subscription.size () - 1)); +            if (it == prefixes.end ()) { +                errno = EINVAL; +                return -1; +            } +            prefixes.erase (it); +        } +        else { +            subscriptions_t::iterator it = topics.find (subscription); +            if (it == topics.end ()) { +                errno = EINVAL; +                return -1; +            } +            topics.erase (it);          } -        subscriptions.erase (it);          return 0;      } @@ -65,18 +89,27 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)          if (rc != 0 && errno == EAGAIN)              return -1; +        //  If there is at least one "*" subscription, the message matches. +        if (all_count) +            return 0; +          //  Check the message format.          //  TODO: We should either ignore the message or drop the connection          //  if the message doesn't conform with the expected format.          unsigned char *data = (unsigned char*) zmq_msg_data (msg_);          zmq_assert (*data <= zmq_msg_size (msg_) - 1); - -        //  Check whether the message matches at least one subscription.          std::string topic ((const char*) (data + 1), *data); -        subscriptions_t::iterator it = subscriptions.find (topic); -        if (it != subscriptions.end ()) -            break; -    } -    return 0; +        //  Check whether the message matches at least one prefix subscription. +        for (subscriptions_t::iterator it = prefixes.begin (); +              it != prefixes.end (); it++) +            if (it->size () <= topic.size () && +                  *it == topic.substr (0, it->size ())) +                return 0; + +        //  Check whether the message matches an exact match subscription. +        subscriptions_t::iterator it = topics.find (topic); +        if (it != topics.end ()) +            return 0; +    }  } | 
