From 00b9a5dedeb47efd78aea59b579a4b5befba743a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 31 Jan 2010 20:14:30 +0100 Subject: ZMQII-51: Implement O(1) topic matching --- src/Makefile.am | 2 + src/prefix_tree.cpp | 175 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/prefix_tree.hpp | 55 +++++++++++++++++ src/sub.cpp | 44 ++----------- src/sub.hpp | 13 ++-- 5 files changed, 242 insertions(+), 47 deletions(-) create mode 100644 src/prefix_tree.cpp create mode 100644 src/prefix_tree.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 6957ff0..58780e4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -92,6 +92,7 @@ libzmq_la_SOURCES = app_thread.hpp \ poll.hpp \ poller.hpp \ p2p.hpp \ + prefix_tree.hpp \ pub.hpp \ rep.hpp \ req.hpp \ @@ -142,6 +143,7 @@ libzmq_la_SOURCES = app_thread.hpp \ pgm_sender.cpp \ pgm_socket.cpp \ p2p.cpp \ + prefix_tree.cpp \ pipe.cpp \ poll.cpp \ pub.cpp \ diff --git a/src/prefix_tree.cpp b/src/prefix_tree.cpp new file mode 100644 index 0000000..441f85d --- /dev/null +++ b/src/prefix_tree.cpp @@ -0,0 +1,175 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include + +#include +#include + +#include "err.hpp" +#include "prefix_tree.hpp" + +zmq::prefix_tree_t::prefix_tree_t () : + refcnt (0), + min (0), + count (0) +{ +} + +zmq::prefix_tree_t::~prefix_tree_t () +{ + if (count == 1) + delete next.node; + else if (count > 1) { + for (unsigned char i = 0; i != count; ++i) + if (next.table [i]) + delete next.table [i]; + free (next.table); + } +} + +void zmq::prefix_tree_t::add (unsigned char *prefix_, size_t size_) +{ + // We are at the node corresponding to the prefix. We are done. + if (!size_) { + ++refcnt; + return; + } + + unsigned char c = *prefix_; + if (c < min || c >= min + count) { + + // The character is out of range of currently handled + // charcters. We have to extend the table. + if (!count) { + min = c; + count = 1; + next.node = NULL; + } + else if (count == 1) { + unsigned char oldc = min; + prefix_tree_t *oldp = next.node; + count = (min < c ? c - min : min - c) + 1; + next.table = (prefix_tree_t**) + malloc (sizeof (prefix_tree_t*) * count); + zmq_assert (next.table); + for (unsigned char i = 0; i != count; ++i) + next.table [i] = 0; + min = std::min (min, c); + next.table [oldc - min] = oldp; + } + else if (min < c) { + + // The new character is above the current character range. + unsigned char old_count = count; + count = c - min + 1; + next.table = (prefix_tree_t**) realloc ((void*) next.table, + sizeof (prefix_tree_t*) * count); + zmq_assert (next.table); + for (unsigned char i = old_count; i != count; i++) + next.table [i] = NULL; + } + else { + + // The new character is below the current character range. + unsigned char old_count = count; + count = (min + old_count) - c; + next.table = (prefix_tree_t**) realloc ((void*) next.table, + sizeof (prefix_tree_t*) * count); + zmq_assert (next.table); + memmove (next.table + min - c, next.table, + old_count * sizeof (prefix_tree_t*)); + for (unsigned char i = 0; i != min - c; i++) + next.table [i] = NULL; + min = c; + } + } + + // If next node does not exist, create one. + if (count == 1) { + if (!next.node) { + next.node = new (std::nothrow) prefix_tree_t; + zmq_assert (next.node); + } + next.node->add (prefix_ + 1, size_ - 1); + } + else { + if (!next.table [c - min]) { + next.table [c - min] = new (std::nothrow) prefix_tree_t; + zmq_assert (next.table [c - min]); + } + next.table [c - min]->add (prefix_ + 1, size_ - 1); + } +} + +bool zmq::prefix_tree_t::rm (unsigned char *prefix_, size_t size_) +{ + if (!size_) { + if (!refcnt) + return false; + refcnt--; + return true; + } + + unsigned char c = *prefix_; + if (!count || c < min || c >= min + count) + return false; + + prefix_tree_t *next_node = + count == 1 ? next.node : next.table [c - min]; + + if (!next_node) + return false; + + return next_node->rm (prefix_ + 1, size_ - 1); +} + +bool zmq::prefix_tree_t::check (unsigned char *data_, size_t size_) +{ + // This function is on critical path. It deliberately doesn't use + // recursion to get a bit better performance. + prefix_tree_t *current = this; + while (true) { + + // We've found a corresponding subscription! + if (current->refcnt) + return true; + + // We've checked all the data and haven't found matching subscription. + if (!size_) + return false; + + // If there's no corresponding slot for the first character + // of the prefix, the message does not match. + unsigned char c = *data_; + if (c < current->min || c >= current->min + current->count) + return false; + + // Move to the next character. + if (current->count == 1) + current = current->next.node; + else { + current = current->next.table [c - current->min]; + if (!current) + return false; + } + data_++; + size_--; + } +} diff --git a/src/prefix_tree.hpp b/src/prefix_tree.hpp new file mode 100644 index 0000000..53c7c18 --- /dev/null +++ b/src/prefix_tree.hpp @@ -0,0 +1,55 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PREFIX_TREE_HPP_INCLUDED__ +#define __ZMQ_PREFIX_TREE_HPP_INCLUDED__ + +#include + +#include "stdint.hpp" + +namespace zmq +{ + + class prefix_tree_t + { + public: + + prefix_tree_t (); + ~prefix_tree_t (); + + void add (unsigned char *prefix_, size_t size_); + bool rm (unsigned char *prefix_, size_t size_); + bool check (unsigned char *data_, size_t size_); + + private: + + uint32_t refcnt; + unsigned char min; + unsigned char count; + union { + class prefix_tree_t *node; + class prefix_tree_t **table; + } next; + }; + +} + +#endif + diff --git a/src/sub.cpp b/src/sub.cpp index b8e3990..06ed896 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -26,7 +26,6 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) : socket_base_t (parent_), - all_count (0), has_message (false) { options.requires_in = true; @@ -72,30 +71,14 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { if (option_ == ZMQ_SUBSCRIBE) { - if (!optvallen_) - all_count++; - else - subscriptions.insert (std::string ((const char*) optval_, - optvallen_)); + subscriptions.add ((unsigned char*) optval_, optvallen_); return 0; } if (option_ == ZMQ_UNSUBSCRIBE) { - if (!optvallen_) { - if (!all_count) { - errno = EINVAL; - return -1; - } - all_count--; - } - else { - subscriptions_t::iterator it = subscriptions.find ( - std::string ((const char*) optval_, optvallen_)); - if (it == subscriptions.end ()) { - errno = EINVAL; - return -1; - } - subscriptions.erase (it); + if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) { + errno = EINVAL; + return -1; } return 0; } @@ -181,21 +164,6 @@ bool zmq::sub_t::xhas_out () bool zmq::sub_t::match (zmq_msg_t *msg_) { - // If there is at least one * subscription, the message matches. - if (all_count) - return true; - - // Check whether the message matches at least one prefix subscription. - // TODO: Make this efficient - O(log(n)) where n is number of characters in - // the longest subscription string. - for (subscriptions_t::iterator it = subscriptions.begin (); - it != subscriptions.end (); it++) { - size_t msg_size = zmq_msg_size (msg_); - size_t sub_size = it->size (); - if (sub_size <= msg_size && - memcmp (zmq_msg_data (msg_), it->data (), sub_size) == 0) - return true; - } - - return false; + return subscriptions.check ((unsigned char*) zmq_msg_data (msg_), + zmq_msg_size (msg_)); } diff --git a/src/sub.hpp b/src/sub.hpp index 2044442..9e7d6cc 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -20,11 +20,9 @@ #ifndef __ZMQ_SUB_HPP_INCLUDED__ #define __ZMQ_SUB_HPP_INCLUDED__ -#include -#include - #include "../bindings/c/zmq.h" +#include "prefix_tree.hpp" #include "socket_base.hpp" #include "fq.hpp" @@ -59,13 +57,10 @@ namespace zmq bool match (zmq_msg_t *msg_); // Fair queueing object for inbound pipes. - fq_t fq; - - // Number of active * subscriptions. - int all_count; + fq_t fq; - typedef std::multiset subscriptions_t; - subscriptions_t subscriptions; + // The repository of subscriptions. + prefix_tree_t subscriptions; // If true, 'message' contains a matching message to return on the // next recv call. -- cgit v1.2.3