diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-12-28 21:29:31 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-12-28 21:29:31 +0100 |
commit | 72161fb075025410312c6735d681c3de9a36a4e5 (patch) | |
tree | d1db0ed04875ccb6b385238abe11a9375ee10f91 | |
parent | c97967ed4b70de700db38cc2661bbe43262bc029 (diff) |
format of subscriptions changed (no * needed anymore)
-rw-r--r-- | src/socket_base.cpp | 11 | ||||
-rw-r--r-- | src/sub.cpp | 90 | ||||
-rw-r--r-- | src/sub.hpp | 9 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 7 |
4 files changed, 51 insertions, 66 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 43209d5..2348f67 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -326,17 +326,20 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) if (errno != EAGAIN) return -1; app_thread->process_commands (false, false); - ticks = 0; rc = xrecv (msg_, flags_); + ticks = 0; } else { while (rc != 0) { - if (errno != EAGAIN) + if (errno == EINPROGRESS) + app_thread->process_commands (false, true); + else if (errno == EAGAIN) + app_thread->process_commands (true, false); + else return -1; - app_thread->process_commands (true, false); - ticks = 0; rc = xrecv (msg_, flags_); } + ticks = 0; } diff --git a/src/sub.cpp b/src/sub.cpp index e5dbe76..95039d7 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string.h> + #include "../bindings/c/zmq.h" #include "sub.hpp" @@ -67,41 +69,30 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { if (option_ == ZMQ_SUBSCRIBE) { - std::string subscription ((const char*) optval_, optvallen_); - if (subscription == "*") + if (!optvallen_) all_count++; - else if (subscription [subscription.size () - 1] == '*') - prefixes.insert (subscription.substr (0, subscription.size () - 1)); - else - topics.insert (subscription); + else + subscriptions.insert (std::string ((const char*) optval_, + optvallen_)); return 0; } if (option_ == ZMQ_UNSUBSCRIBE) { - std::string subscription ((const char*) optval_, optvallen_); - if (subscription == "*") { + if (!optvallen_) { if (!all_count) { errno = EINVAL; return -1; } all_count--; } - else if (subscription [subscription.size () - 1] == '*') { - subscriptions_t::iterator it = prefixes.find ( - subscription.substr (0, subscription.size () - 1)); - if (it == prefixes.end ()) { - errno = EINVAL; - return -1; - } - prefixes.erase (it); - } else { - subscriptions_t::iterator it = topics.find (subscription); - if (it == topics.end ()) { + subscriptions_t::iterator it = subscriptions.find ( + std::string ((const char*) optval_, optvallen_)); + if (it == subscriptions.end ()) { errno = EINVAL; return -1; } - topics.erase (it); + subscriptions.erase (it); } return 0; } @@ -124,44 +115,37 @@ int zmq::sub_t::xflush () int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) { - while (true) { - - // Get a message using fair queueing algorithm. - int rc = fq.recv (msg_, flags_); + // Get a message using fair queueing algorithm. + int rc = fq.recv (msg_, flags_); - // If there's no message available, return immediately. - if (rc != 0 && errno == EAGAIN) - return -1; + // If there's no message available, return immediately. + if (rc != 0 && errno == EAGAIN) + return -1; - // If there is no subscription return -1/EAGAIN. - if (!all_count && prefixes.empty () && topics.empty ()) { - errno = EAGAIN; - return -1; - } - - // If there is at least one "*" subscription, the message matches. - if (all_count) - return 0; + // If there is at least one * subscription, the message matches. + if (all_count) + return 0; - // Check the message format. - // TODO: We should either ignore the message or drop the connection - // if the message doesn't conform with the expected format. - unsigned char *data = (unsigned char*) zmq_msg_data (msg_); - zmq_assert (*data <= zmq_msg_size (msg_) - 1); - std::string topic ((const char*) (data + 1), *data); - - // Check whether the message matches at least one prefix subscription. - for (subscriptions_t::iterator it = prefixes.begin (); - it != prefixes.end (); it++) - if (it->size () <= topic.size () && - *it == topic.substr (0, it->size ())) - return 0; - - // Check whether the message matches an exact match subscription. - subscriptions_t::iterator it = topics.find (topic); - if (it != topics.end ()) + // Check whether the message matches at least one prefix subscription. + // TODO: Make this efficient - O(log(n)) where n is number of characters in + // the longest subscription string. + for (subscriptions_t::iterator it = subscriptions.begin (); + it != subscriptions.end (); it++) { + size_t msg_size = zmq_msg_size (msg_); + size_t sub_size = it->size (); + if (sub_size <= msg_size && + memcmp (zmq_msg_data (msg_), it->data (), sub_size) == 0) return 0; } + + // The message did not pass the filter. Trim it. + // Note that we are returning a different error code so that the caller + // knows there are more messages available. We cannot loop here as + // a stream of non-matching messages would create a DoS situation. + zmq_msg_close (msg_); + zmq_msg_init (msg_); + errno = EINPROGRESS; + return -1; } bool zmq::sub_t::xhas_in () diff --git a/src/sub.hpp b/src/sub.hpp index 1eafdac..a7cd134 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -56,16 +56,11 @@ namespace zmq // Fair queueing object for inbound pipes. fq_t fq; - // Number of active "*" subscriptions. + // Number of active * subscriptions. int all_count; typedef std::multiset <std::string> subscriptions_t; - - // List of all prefix subscriptions. - subscriptions_t prefixes; - - // List of all exact match subscriptions. - subscriptions_t topics; + subscriptions_t subscriptions; sub_t (const sub_t&); void operator = (const sub_t&); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index cfc87a7..b568993 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -101,8 +101,11 @@ void zmq::zmq_engine_t::in_event () insize -= processed; // Stop polling for input if we got stuck. - if (processed < insize) - reset_pollin (handle); + if (processed < insize) { + zmq_assert (false); + // TODO: This may happen is queue limits are implemented. + // reset_pollin (handle); + } // Flush all messages the decoder may have produced. inout->flush (); |