From 72161fb075025410312c6735d681c3de9a36a4e5 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 28 Dec 2009 21:29:31 +0100 Subject: format of subscriptions changed (no * needed anymore) --- src/sub.cpp | 90 +++++++++++++++++++++++++------------------------------------ 1 file changed, 37 insertions(+), 53 deletions(-) (limited to 'src/sub.cpp') diff --git a/src/sub.cpp b/src/sub.cpp index e5dbe76..95039d7 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -17,6 +17,8 @@ along with this program. If not, see . */ +#include + #include "../bindings/c/zmq.h" #include "sub.hpp" @@ -67,41 +69,30 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { if (option_ == ZMQ_SUBSCRIBE) { - std::string subscription ((const char*) optval_, optvallen_); - if (subscription == "*") + if (!optvallen_) all_count++; - else if (subscription [subscription.size () - 1] == '*') - prefixes.insert (subscription.substr (0, subscription.size () - 1)); - else - topics.insert (subscription); + else + subscriptions.insert (std::string ((const char*) optval_, + optvallen_)); return 0; } if (option_ == ZMQ_UNSUBSCRIBE) { - std::string subscription ((const char*) optval_, optvallen_); - if (subscription == "*") { + if (!optvallen_) { if (!all_count) { errno = EINVAL; return -1; } all_count--; } - else if (subscription [subscription.size () - 1] == '*') { - subscriptions_t::iterator it = prefixes.find ( - subscription.substr (0, subscription.size () - 1)); - if (it == prefixes.end ()) { - errno = EINVAL; - return -1; - } - prefixes.erase (it); - } else { - subscriptions_t::iterator it = topics.find (subscription); - if (it == topics.end ()) { + subscriptions_t::iterator it = subscriptions.find ( + std::string ((const char*) optval_, optvallen_)); + if (it == subscriptions.end ()) { errno = EINVAL; return -1; } - topics.erase (it); + subscriptions.erase (it); } return 0; } @@ -124,44 +115,37 @@ int zmq::sub_t::xflush () int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) { - while (true) { - - // Get a message using fair queueing algorithm. - int rc = fq.recv (msg_, flags_); + // Get a message using fair queueing algorithm. + int rc = fq.recv (msg_, flags_); - // If there's no message available, return immediately. - if (rc != 0 && errno == EAGAIN) - return -1; + // If there's no message available, return immediately. + if (rc != 0 && errno == EAGAIN) + return -1; - // If there is no subscription return -1/EAGAIN. - if (!all_count && prefixes.empty () && topics.empty ()) { - errno = EAGAIN; - return -1; - } - - // If there is at least one "*" subscription, the message matches. - if (all_count) - return 0; + // If there is at least one * subscription, the message matches. + if (all_count) + return 0; - // Check the message format. - // TODO: We should either ignore the message or drop the connection - // if the message doesn't conform with the expected format. - unsigned char *data = (unsigned char*) zmq_msg_data (msg_); - zmq_assert (*data <= zmq_msg_size (msg_) - 1); - std::string topic ((const char*) (data + 1), *data); - - // Check whether the message matches at least one prefix subscription. - for (subscriptions_t::iterator it = prefixes.begin (); - it != prefixes.end (); it++) - if (it->size () <= topic.size () && - *it == topic.substr (0, it->size ())) - return 0; - - // Check whether the message matches an exact match subscription. - subscriptions_t::iterator it = topics.find (topic); - if (it != topics.end ()) + // Check whether the message matches at least one prefix subscription. + // TODO: Make this efficient - O(log(n)) where n is number of characters in + // the longest subscription string. + for (subscriptions_t::iterator it = subscriptions.begin (); + it != subscriptions.end (); it++) { + size_t msg_size = zmq_msg_size (msg_); + size_t sub_size = it->size (); + if (sub_size <= msg_size && + memcmp (zmq_msg_data (msg_), it->data (), sub_size) == 0) return 0; } + + // The message did not pass the filter. Trim it. + // Note that we are returning a different error code so that the caller + // knows there are more messages available. We cannot loop here as + // a stream of non-matching messages would create a DoS situation. + zmq_msg_close (msg_); + zmq_msg_init (msg_); + errno = EINPROGRESS; + return -1; } bool zmq::sub_t::xhas_in () -- cgit v1.2.3