From 7be7962f9802b48e66663416097eb76edfa83e1e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 11 Sep 2009 16:23:16 +0200 Subject: prefix-style message filtering added --- src/sub.cpp | 61 +++++++++++++++++++++++++++++++++++++++++++++++-------------- src/sub.hpp | 10 ++++++++-- 2 files changed, 55 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/sub.cpp b/src/sub.cpp index 954eb87..8c1ef9b 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -23,7 +23,8 @@ #include "err.hpp" zmq::sub_t::sub_t (class app_thread_t *parent_) : - socket_base_t (parent_) + socket_base_t (parent_), + all_count (0) { } @@ -36,18 +37,41 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_, { if (option_ == ZMQ_SUBSCRIBE) { std::string subscription ((const char*) optval_, optvallen_); - subscriptions.insert (subscription); + if (subscription == "*") + all_count++; + else if (subscription [subscription.size () - 1] == '*') + prefixes.insert (subscription.substr (0, subscription.size () - 1)); + else + topics.insert (subscription); return 0; } if (option_ == ZMQ_UNSUBSCRIBE) { std::string subscription ((const char*) optval_, optvallen_); - subscriptions_t::iterator it = subscriptions.find (subscription); - if (it == subscriptions.end ()) { - errno = EINVAL; - return -1; + if (subscription == "*") { + if (!all_count) { + errno = EINVAL; + return -1; + } + all_count--; + } + else if (subscription [subscription.size () - 1] == '*') { + subscriptions_t::iterator it = prefixes.find ( + subscription.substr (0, subscription.size () - 1)); + if (it == prefixes.end ()) { + errno = EINVAL; + return -1; + } + prefixes.erase (it); + } + else { + subscriptions_t::iterator it = topics.find (subscription); + if (it == topics.end ()) { + errno = EINVAL; + return -1; + } + topics.erase (it); } - subscriptions.erase (it); return 0; } @@ -65,18 +89,27 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) if (rc != 0 && errno == EAGAIN) return -1; + // If there is at least one "*" subscription, the message matches. + if (all_count) + return 0; + // Check the message format. // TODO: We should either ignore the message or drop the connection // if the message doesn't conform with the expected format. unsigned char *data = (unsigned char*) zmq_msg_data (msg_); zmq_assert (*data <= zmq_msg_size (msg_) - 1); - - // Check whether the message matches at least one subscription. std::string topic ((const char*) (data + 1), *data); - subscriptions_t::iterator it = subscriptions.find (topic); - if (it != subscriptions.end ()) - break; - } - return 0; + // Check whether the message matches at least one prefix subscription. + for (subscriptions_t::iterator it = prefixes.begin (); + it != prefixes.end (); it++) + if (it->size () <= topic.size () && + *it == topic.substr (0, it->size ())) + return 0; + + // Check whether the message matches an exact match subscription. + subscriptions_t::iterator it = topics.find (topic); + if (it != topics.end ()) + return 0; + } } diff --git a/src/sub.hpp b/src/sub.hpp index 1d4fdf9..c88d30c 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -41,9 +41,15 @@ namespace zmq private: - // List of all the active subscriptions. + // Number of active "*" subscriptions. + int all_count; + + // List of all prefix subscriptions. typedef std::multiset subscriptions_t; - subscriptions_t subscriptions; + subscriptions_t prefixes; + + // List of all exact match subscriptions. + subscriptions_t topics; }; } -- cgit v1.2.3