diff options
| -rw-r--r-- | src/socket_base.cpp | 11 | ||||
| -rw-r--r-- | src/sub.cpp | 90 | ||||
| -rw-r--r-- | src/sub.hpp | 9 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 7 | 
4 files changed, 51 insertions, 66 deletions
| diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 43209d5..2348f67 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -326,17 +326,20 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)          if (errno != EAGAIN)              return -1;          app_thread->process_commands (false, false); -        ticks = 0;          rc = xrecv (msg_, flags_); +        ticks = 0;      }      else  {          while (rc != 0) { -            if (errno != EAGAIN) +            if (errno == EINPROGRESS) +                app_thread->process_commands (false, true); +            else if (errno == EAGAIN) +                app_thread->process_commands (true, false); +            else                  return -1; -            app_thread->process_commands (true, false); -            ticks = 0;              rc = xrecv (msg_, flags_);          } +        ticks = 0;      } diff --git a/src/sub.cpp b/src/sub.cpp index e5dbe76..95039d7 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -17,6 +17,8 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include <string.h> +  #include "../bindings/c/zmq.h"  #include "sub.hpp" @@ -67,41 +69,30 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,      size_t optvallen_)  {      if (option_ == ZMQ_SUBSCRIBE) { -        std::string subscription ((const char*) optval_, optvallen_); -        if (subscription == "*") +        if (!optvallen_)              all_count++; -        else if (subscription [subscription.size () - 1] == '*') -            prefixes.insert (subscription.substr (0, subscription.size () - 1)); -        else -            topics.insert (subscription); +        else  +            subscriptions.insert (std::string ((const char*) optval_, +                optvallen_));          return 0;      }      if (option_ == ZMQ_UNSUBSCRIBE) { -        std::string subscription ((const char*) optval_, optvallen_); -        if (subscription == "*") { +        if (!optvallen_) {              if (!all_count) {                  errno = EINVAL;                  return -1;              }              all_count--;          } -        else if (subscription [subscription.size () - 1] == '*') { -            subscriptions_t::iterator it = prefixes.find ( -                subscription.substr (0, subscription.size () - 1)); -            if (it == prefixes.end ()) { -                errno = EINVAL; -                return -1; -            } -            prefixes.erase (it); -        }          else { -            subscriptions_t::iterator it = topics.find (subscription); -            if (it == topics.end ()) { +            subscriptions_t::iterator it = subscriptions.find ( +                std::string ((const char*) optval_, optvallen_)); +            if (it == subscriptions.end ()) {                  errno = EINVAL;                  return -1;              } -            topics.erase (it); +            subscriptions.erase (it);          }          return 0;      } @@ -124,44 +115,37 @@ int zmq::sub_t::xflush ()  int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)  { -    while (true) { - -        //  Get a message using fair queueing algorithm. -        int rc = fq.recv (msg_, flags_); +    //  Get a message using fair queueing algorithm. +    int rc = fq.recv (msg_, flags_); -        //  If there's no message available, return immediately. -        if (rc != 0 && errno == EAGAIN) -            return -1; +    //  If there's no message available, return immediately. +    if (rc != 0 && errno == EAGAIN) +        return -1; -        //  If there is no subscription return -1/EAGAIN. -        if (!all_count && prefixes.empty () && topics.empty ()) { -            errno = EAGAIN; -            return -1;  -        } - -        //  If there is at least one "*" subscription, the message matches. -        if (all_count) -            return 0; +    //  If there is at least one * subscription, the message matches. +    if (all_count) +        return 0; -        //  Check the message format. -        //  TODO: We should either ignore the message or drop the connection -        //  if the message doesn't conform with the expected format. -        unsigned char *data = (unsigned char*) zmq_msg_data (msg_); -        zmq_assert (*data <= zmq_msg_size (msg_) - 1); -        std::string topic ((const char*) (data + 1), *data); - -        //  Check whether the message matches at least one prefix subscription. -        for (subscriptions_t::iterator it = prefixes.begin (); -              it != prefixes.end (); it++) -            if (it->size () <= topic.size () && -                  *it == topic.substr (0, it->size ())) -                return 0; - -        //  Check whether the message matches an exact match subscription. -        subscriptions_t::iterator it = topics.find (topic); -        if (it != topics.end ()) +    //  Check whether the message matches at least one prefix subscription. +    //  TODO: Make this efficient - O(log(n)) where n is number of characters in +    //  the longest subscription string. +    for (subscriptions_t::iterator it = subscriptions.begin (); +          it != subscriptions.end (); it++) { +        size_t msg_size = zmq_msg_size (msg_); +        size_t sub_size = it->size (); +        if (sub_size <= msg_size && +              memcmp (zmq_msg_data (msg_), it->data (), sub_size) == 0)              return 0;      } + +    //  The message did not pass the filter. Trim it. +    //  Note that we are returning a different error code so that the caller +    //  knows there are more messages available. We cannot loop here as +    //  a stream of non-matching messages would create a DoS situation. +    zmq_msg_close (msg_); +    zmq_msg_init (msg_); +    errno = EINPROGRESS; +    return -1;  }  bool zmq::sub_t::xhas_in () diff --git a/src/sub.hpp b/src/sub.hpp index 1eafdac..a7cd134 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -56,16 +56,11 @@ namespace zmq          //  Fair queueing object for inbound pipes.           fq_t fq; -        //  Number of active "*" subscriptions. +        //  Number of active * subscriptions.          int all_count;          typedef std::multiset <std::string> subscriptions_t; - -        //  List of all prefix subscriptions. -        subscriptions_t prefixes; - -        //  List of all exact match subscriptions. -        subscriptions_t topics; +        subscriptions_t subscriptions;          sub_t (const sub_t&);          void operator = (const sub_t&); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index cfc87a7..b568993 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -101,8 +101,11 @@ void zmq::zmq_engine_t::in_event ()      insize -= processed;      //  Stop polling for input if we got stuck. -    if (processed < insize) -        reset_pollin (handle); +    if (processed < insize) { +        zmq_assert (false); +        //  TODO: This may happen is queue limits are implemented. +        //  reset_pollin (handle); +    }      //  Flush all messages the decoder may have produced.      inout->flush (); | 
