summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-09-11 16:23:16 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-09-11 16:23:16 +0200
commit7be7962f9802b48e66663416097eb76edfa83e1e (patch)
treee9d118736dd35e6db77e5a600ebf543c27695e4f /src
parent42ad2aa02d3b14eaa3c36c3f1a5a5789546ba411 (diff)
prefix-style message filtering added
Diffstat (limited to 'src')
-rw-r--r--src/sub.cpp61
-rw-r--r--src/sub.hpp10
2 files changed, 55 insertions, 16 deletions
diff --git a/src/sub.cpp b/src/sub.cpp
index 954eb87..8c1ef9b 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -23,7 +23,8 @@
#include "err.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) :
- socket_base_t (parent_)
+ socket_base_t (parent_),
+ all_count (0)
{
}
@@ -36,18 +37,41 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_,
{
if (option_ == ZMQ_SUBSCRIBE) {
std::string subscription ((const char*) optval_, optvallen_);
- subscriptions.insert (subscription);
+ if (subscription == "*")
+ all_count++;
+ else if (subscription [subscription.size () - 1] == '*')
+ prefixes.insert (subscription.substr (0, subscription.size () - 1));
+ else
+ topics.insert (subscription);
return 0;
}
if (option_ == ZMQ_UNSUBSCRIBE) {
std::string subscription ((const char*) optval_, optvallen_);
- subscriptions_t::iterator it = subscriptions.find (subscription);
- if (it == subscriptions.end ()) {
- errno = EINVAL;
- return -1;
+ if (subscription == "*") {
+ if (!all_count) {
+ errno = EINVAL;
+ return -1;
+ }
+ all_count--;
+ }
+ else if (subscription [subscription.size () - 1] == '*') {
+ subscriptions_t::iterator it = prefixes.find (
+ subscription.substr (0, subscription.size () - 1));
+ if (it == prefixes.end ()) {
+ errno = EINVAL;
+ return -1;
+ }
+ prefixes.erase (it);
+ }
+ else {
+ subscriptions_t::iterator it = topics.find (subscription);
+ if (it == topics.end ()) {
+ errno = EINVAL;
+ return -1;
+ }
+ topics.erase (it);
}
- subscriptions.erase (it);
return 0;
}
@@ -65,18 +89,27 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
if (rc != 0 && errno == EAGAIN)
return -1;
+ // If there is at least one "*" subscription, the message matches.
+ if (all_count)
+ return 0;
+
// Check the message format.
// TODO: We should either ignore the message or drop the connection
// if the message doesn't conform with the expected format.
unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
zmq_assert (*data <= zmq_msg_size (msg_) - 1);
-
- // Check whether the message matches at least one subscription.
std::string topic ((const char*) (data + 1), *data);
- subscriptions_t::iterator it = subscriptions.find (topic);
- if (it != subscriptions.end ())
- break;
- }
- return 0;
+ // Check whether the message matches at least one prefix subscription.
+ for (subscriptions_t::iterator it = prefixes.begin ();
+ it != prefixes.end (); it++)
+ if (it->size () <= topic.size () &&
+ *it == topic.substr (0, it->size ()))
+ return 0;
+
+ // Check whether the message matches an exact match subscription.
+ subscriptions_t::iterator it = topics.find (topic);
+ if (it != topics.end ())
+ return 0;
+ }
}
diff --git a/src/sub.hpp b/src/sub.hpp
index 1d4fdf9..c88d30c 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -41,9 +41,15 @@ namespace zmq
private:
- // List of all the active subscriptions.
+ // Number of active "*" subscriptions.
+ int all_count;
+
+ // List of all prefix subscriptions.
typedef std::multiset <std::string> subscriptions_t;
- subscriptions_t subscriptions;
+ subscriptions_t prefixes;
+
+ // List of all exact match subscriptions.
+ subscriptions_t topics;
};
}