diff options
| -rw-r--r-- | src/Makefile.am | 2 | ||||
| -rw-r--r-- | src/prefix_tree.cpp | 175 | ||||
| -rw-r--r-- | src/prefix_tree.hpp | 55 | ||||
| -rw-r--r-- | src/sub.cpp | 44 | ||||
| -rw-r--r-- | src/sub.hpp | 13 | 
5 files changed, 242 insertions, 47 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 6957ff0..58780e4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -92,6 +92,7 @@ libzmq_la_SOURCES = app_thread.hpp \      poll.hpp \      poller.hpp \      p2p.hpp \ +    prefix_tree.hpp \      pub.hpp \      rep.hpp \      req.hpp \ @@ -142,6 +143,7 @@ libzmq_la_SOURCES = app_thread.hpp \      pgm_sender.cpp \      pgm_socket.cpp \      p2p.cpp \ +    prefix_tree.cpp \      pipe.cpp \      poll.cpp \      pub.cpp \ diff --git a/src/prefix_tree.cpp b/src/prefix_tree.cpp new file mode 100644 index 0000000..441f85d --- /dev/null +++ b/src/prefix_tree.cpp @@ -0,0 +1,175 @@ +/* +    Copyright (c) 2007-2010 iMatix Corporation + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> + +#include <new> +#include <algorithm> + +#include "err.hpp" +#include "prefix_tree.hpp" + +zmq::prefix_tree_t::prefix_tree_t () : +    refcnt (0), +    min (0), +    count (0) +{ +} + +zmq::prefix_tree_t::~prefix_tree_t () +{ +    if (count == 1) +        delete next.node; +    else if (count > 1) { +        for (unsigned char i = 0; i != count; ++i) +            if (next.table [i]) +                delete next.table [i]; +        free (next.table); +    } +} + +void zmq::prefix_tree_t::add (unsigned char *prefix_, size_t size_) +{ +    //  We are at the node corresponding to the prefix. We are done. +    if (!size_) { +        ++refcnt; +        return; +    } + +    unsigned char c = *prefix_; +    if (c < min || c >= min + count) { + +        //  The character is out of range of currently handled +        //  charcters. We have to extend the table. +        if (!count) { +            min = c; +            count = 1; +            next.node = NULL; +        } +        else if (count == 1) { +            unsigned char oldc = min; +            prefix_tree_t *oldp = next.node; +            count = (min < c ? c - min : min - c) + 1; +            next.table = (prefix_tree_t**) +                malloc (sizeof (prefix_tree_t*) * count); +            zmq_assert (next.table); +            for (unsigned char i = 0; i != count; ++i) +                next.table [i] = 0; +            min = std::min (min, c); +            next.table [oldc - min] = oldp; +        } +        else if (min < c) { + +            //  The new character is above the current character range. +            unsigned char old_count = count; +            count = c - min + 1; +            next.table = (prefix_tree_t**) realloc ((void*) next.table, +                sizeof (prefix_tree_t*) * count); +            zmq_assert (next.table); +            for (unsigned char i = old_count; i != count; i++) +                next.table [i] = NULL; +        } +        else { + +            //  The new character is below the current character range. +            unsigned char old_count = count; +            count = (min + old_count) - c; +            next.table = (prefix_tree_t**) realloc ((void*) next.table, +                sizeof (prefix_tree_t*) * count); +            zmq_assert (next.table); +            memmove (next.table + min - c, next.table, +                old_count * sizeof (prefix_tree_t*)); +            for (unsigned char i = 0; i != min - c; i++) +                next.table [i] = NULL; +            min = c; +        } +    } + +    //  If next node does not exist, create one. +    if (count == 1) { +        if (!next.node) { +            next.node = new (std::nothrow) prefix_tree_t; +            zmq_assert (next.node); +        } +        next.node->add (prefix_ + 1, size_ - 1); +    } +    else { +        if (!next.table [c - min]) { +            next.table [c - min] = new (std::nothrow) prefix_tree_t; +            zmq_assert (next.table [c - min]); +        } +        next.table [c - min]->add (prefix_ + 1, size_ - 1); +    } +} + +bool zmq::prefix_tree_t::rm (unsigned char *prefix_, size_t size_) +{ +     if (!size_) { +         if (!refcnt) +             return false; +         refcnt--; +         return true; +     } + +     unsigned char c = *prefix_; +     if (!count || c < min || c >= min + count) +         return false; + +     prefix_tree_t *next_node = +         count == 1 ? next.node : next.table [c - min]; + +     if (!next_node) +         return false; + +     return next_node->rm (prefix_ + 1, size_ - 1); +} + +bool zmq::prefix_tree_t::check (unsigned char *data_, size_t size_) +{ +    //  This function is on critical path. It deliberately doesn't use +    //  recursion to get a bit better performance. +    prefix_tree_t *current = this; +    while (true) { + +        //  We've found a corresponding subscription! +        if (current->refcnt) +            return true; + +        //  We've checked all the data and haven't found matching subscription. +        if (!size_) +            return false; + +        //  If there's no corresponding slot for the first character +        //  of the prefix, the message does not match. +        unsigned char c = *data_; +        if (c < current->min || c >= current->min + current->count) +            return false; + +        //  Move to the next character. +        if (current->count == 1) +            current = current->next.node; +        else { +            current = current->next.table [c - current->min]; +            if (!current) +                return false; +        } +        data_++; +        size_--; +    } +} diff --git a/src/prefix_tree.hpp b/src/prefix_tree.hpp new file mode 100644 index 0000000..53c7c18 --- /dev/null +++ b/src/prefix_tree.hpp @@ -0,0 +1,55 @@ +/* +    Copyright (c) 2007-2010 iMatix Corporation + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_PREFIX_TREE_HPP_INCLUDED__ +#define __ZMQ_PREFIX_TREE_HPP_INCLUDED__ + +#include <stddef.h> + +#include "stdint.hpp" + +namespace zmq +{ + +    class prefix_tree_t +    { +    public: + +        prefix_tree_t (); +        ~prefix_tree_t (); + +        void add (unsigned char *prefix_, size_t size_); +        bool rm (unsigned char *prefix_, size_t size_); +        bool check (unsigned char *data_, size_t size_); + +    private: + +        uint32_t refcnt; +        unsigned char min; +        unsigned char count; +        union { +            class prefix_tree_t *node; +            class prefix_tree_t **table; +        } next; +    }; + +} + +#endif + diff --git a/src/sub.cpp b/src/sub.cpp index b8e3990..06ed896 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -26,7 +26,6 @@  zmq::sub_t::sub_t (class app_thread_t *parent_) :      socket_base_t (parent_), -    all_count (0),      has_message (false)  {      options.requires_in = true; @@ -72,30 +71,14 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,      size_t optvallen_)  {      if (option_ == ZMQ_SUBSCRIBE) { -        if (!optvallen_) -            all_count++; -        else  -            subscriptions.insert (std::string ((const char*) optval_, -                optvallen_)); +        subscriptions.add ((unsigned char*) optval_, optvallen_);          return 0;      }      if (option_ == ZMQ_UNSUBSCRIBE) { -        if (!optvallen_) { -            if (!all_count) { -                errno = EINVAL; -                return -1; -            } -            all_count--; -        } -        else { -            subscriptions_t::iterator it = subscriptions.find ( -                std::string ((const char*) optval_, optvallen_)); -            if (it == subscriptions.end ()) { -                errno = EINVAL; -                return -1; -            } -            subscriptions.erase (it); +        if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) { +            errno = EINVAL; +            return -1;          }          return 0;      } @@ -181,21 +164,6 @@ bool zmq::sub_t::xhas_out ()  bool zmq::sub_t::match (zmq_msg_t *msg_)  { -    //  If there is at least one * subscription, the message matches. -    if (all_count) -        return true; - -    //  Check whether the message matches at least one prefix subscription. -    //  TODO: Make this efficient - O(log(n)) where n is number of characters in -    //  the longest subscription string. -    for (subscriptions_t::iterator it = subscriptions.begin (); -          it != subscriptions.end (); it++) { -        size_t msg_size = zmq_msg_size (msg_); -        size_t sub_size = it->size (); -        if (sub_size <= msg_size && -              memcmp (zmq_msg_data (msg_), it->data (), sub_size) == 0) -            return true; -    } - -    return false; +    return subscriptions.check ((unsigned char*) zmq_msg_data (msg_), +        zmq_msg_size (msg_));  } diff --git a/src/sub.hpp b/src/sub.hpp index 2044442..9e7d6cc 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -20,11 +20,9 @@  #ifndef __ZMQ_SUB_HPP_INCLUDED__  #define __ZMQ_SUB_HPP_INCLUDED__ -#include <set> -#include <string> -  #include "../bindings/c/zmq.h" +#include "prefix_tree.hpp"  #include "socket_base.hpp"  #include "fq.hpp" @@ -59,13 +57,10 @@ namespace zmq          bool match (zmq_msg_t *msg_);          //  Fair queueing object for inbound pipes. -         fq_t fq; - -        //  Number of active * subscriptions. -        int all_count; +        fq_t fq; -        typedef std::multiset <std::string> subscriptions_t; -        subscriptions_t subscriptions; +        //  The repository of subscriptions. +        prefix_tree_t subscriptions;          //  If true, 'message' contains a matching message to return on the          //  next recv call.  | 
