From d82cbb3a81f116cd22e9895ecac36ac3d7b38929 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 5 Apr 2012 07:32:58 +0200 Subject: XS_PLUGIN and XS_FILTER implementation This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik --- src/mtrie.cpp | 436 ---------------------------------------------------------- 1 file changed, 436 deletions(-) delete mode 100644 src/mtrie.cpp (limited to 'src/mtrie.cpp') diff --git a/src/mtrie.cpp b/src/mtrie.cpp deleted file mode 100644 index eae34c2..0000000 --- a/src/mtrie.cpp +++ /dev/null @@ -1,436 +0,0 @@ -/* - Copyright (c) 2011-2012 250bpm s.r.o. - Copyright (c) 2011-2012 Spotify AB - Copyright (c) 2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include - -#include -#include - -#include "platform.hpp" -#if defined XS_HAVE_WINDOWS -#include "windows.hpp" -#endif - -#include "err.hpp" -#include "pipe.hpp" -#include "mtrie.hpp" - -xs::mtrie_t::mtrie_t () : - pipes (0), - min (0), - count (0), - live_nodes (0) -{ -} - -xs::mtrie_t::~mtrie_t () -{ - if (pipes) { - delete pipes; - pipes = 0; - } - - if (count == 1) { - xs_assert (next.node); - delete next.node; - next.node = 0; - } - else if (count > 1) { - for (unsigned short i = 0; i != count; ++i) - if (next.table [i]) - delete next.table [i]; - free (next.table); - } -} - -bool xs::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return add_helper (prefix_, size_, pipe_); -} - -bool xs::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - // We are at the node corresponding to the prefix. We are done. - if (!size_) { - bool result = !pipes; - if (!pipes) - pipes = new pipes_t; - pipes->insert (pipe_); - return result; - } - - unsigned char c = *prefix_; - if (c < min || c >= min + count) { - - // The character is out of range of currently handled - // charcters. We have to extend the table. - if (!count) { - min = c; - count = 1; - next.node = NULL; - } - else if (count == 1) { - unsigned char oldc = min; - mtrie_t *oldp = next.node; - count = (min < c ? c - min : min - c) + 1; - next.table = (mtrie_t**) - malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - for (unsigned short i = 0; i != count; ++i) - next.table [i] = 0; - min = std::min (min, c); - next.table [oldc - min] = oldp; - } - else if (min < c) { - - // The new character is above the current character range. - unsigned short old_count = count; - count = c - min + 1; - next.table = (mtrie_t**) realloc ((void*) next.table, - sizeof (mtrie_t*) * count); - xs_assert (next.table); - for (unsigned short i = old_count; i != count; i++) - next.table [i] = NULL; - } - else { - - // The new character is below the current character range. - unsigned short old_count = count; - count = (min + old_count) - c; - next.table = (mtrie_t**) realloc ((void*) next.table, - sizeof (mtrie_t*) * count); - xs_assert (next.table); - memmove (next.table + min - c, next.table, - old_count * sizeof (mtrie_t*)); - for (unsigned short i = 0; i != min - c; i++) - next.table [i] = NULL; - min = c; - } - } - - // If next node does not exist, create one. - if (count == 1) { - if (!next.node) { - next.node = new (std::nothrow) mtrie_t; - ++live_nodes; - xs_assert (next.node); - } - return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_); - } - else { - if (!next.table [c - min]) { - next.table [c - min] = new (std::nothrow) mtrie_t; - ++live_nodes; - xs_assert (next.table [c - min]); - } - return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_); - } -} - - -void xs::mtrie_t::rm (pipe_t *pipe_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) -{ - unsigned char *buff = NULL; - rm_helper (pipe_, &buff, 0, 0, func_, arg_); - free (buff); -} - -void xs::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) -{ - // Remove the subscription from this node. - if (pipes && pipes->erase (pipe_) && pipes->empty ()) { - func_ (*buff_, buffsize_, arg_); - delete pipes; - pipes = 0; - } - - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); - alloc_assert (*buff_); - } - - // If there are no subnodes in the trie, return. - if (count == 0) - return; - - // If there's one subnode (optimisation). - if (count == 1) { - (*buff_) [buffsize_] = min; - buffsize_++; - next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, - func_, arg_); - - // Prune the node if it was made redundant by the removal - if (next.node->is_redundant ()) { - delete next.node; - next.node = 0; - count = 0; - --live_nodes; - xs_assert (live_nodes == 0); - } - return; - } - - // If there are multiple subnodes. - // - // New min non-null character in the node table after the removal - unsigned char new_min = min + count - 1; - // New max non-null character in the node table after the removal - unsigned char new_max = min; - for (unsigned short c = 0; c != count; c++) { - (*buff_) [buffsize_] = min + c; - if (next.table [c]) { - next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, - maxbuffsize_, func_, arg_); - - // Prune redundant nodes from the mtrie - if (next.table [c]->is_redundant ()) { - delete next.table [c]; - next.table [c] = 0; - - xs_assert (live_nodes > 0); - --live_nodes; - } - else { - // The node is not redundant, so it's a candidate for being - // the new min/max node. - // - // We loop through the node array from left to right, so the - // first non-null, non-redundant node encountered is the new - // minimum index. Conversely, the last non-redundant, non-null - // node encountered is the new maximum index. - if (c + min < new_min) - new_min = c + min; - if (c + min > new_max) - new_max = c + min; - } - } - } - - xs_assert (count > 1); - - // Compact the node table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - xs_assert (new_min == new_max); - xs_assert (new_min >= min && new_min < min + count); - mtrie_t *node = next.table [new_min - min]; - xs_assert (node); - free (next.table); - next.node = node; - count = 1; - min = new_min; - } - else if (live_nodes > 1 && (new_min > min || new_max < min + count - 1)) { - xs_assert (new_max - new_min + 1 > 1); - - mtrie_t **old_table = next.table; - xs_assert (new_min > min || new_max < min + count - 1); - xs_assert (new_min >= min); - xs_assert (new_max <= min + count - 1); - xs_assert (new_max - new_min + 1 < count); - - count = new_max - new_min + 1; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } -} - -bool xs::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return rm_helper (prefix_, size_, pipe_); -} - -bool xs::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - if (!size_) { - if (pipes) { - pipes_t::size_type erased = pipes->erase (pipe_); - xs_assert (erased == 1); - if (pipes->empty ()) { - delete pipes; - pipes = 0; - } - } - return !pipes; - } - - unsigned char c = *prefix_; - if (!count || c < min || c >= min + count) - return false; - - mtrie_t *next_node = - count == 1 ? next.node : next.table [c - min]; - - if (!next_node) - return false; - - bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); - - if (next_node->is_redundant ()) { - delete next_node; - xs_assert (count > 0); - - if (count == 1) { - next.node = 0; - count = 0; - --live_nodes; - xs_assert (live_nodes == 0); - } - else { - next.table [c - min] = 0; - xs_assert (live_nodes > 1); - --live_nodes; - - // Compact the table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - mtrie_t *node = 0; - for (unsigned short i = 0; i < count; ++i) { - if (next.table [i]) { - node = next.table [i]; - min = i + min; - break; - } - } - - xs_assert (node); - free (next.table); - next.node = node; - count = 1; - } - else if (c == min) { - // We can compact the table "from the left" - unsigned char new_min = min; - for (unsigned short i = 1; i < count; ++i) { - if (next.table [i]) { - new_min = i + min; - break; - } - } - xs_assert (new_min != min); - - mtrie_t **old_table = next.table; - xs_assert (new_min > min); - xs_assert (count > new_min - min); - - count = count - (new_min - min); - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } - else if (c == min + count - 1) { - // We can compact the table "from the right" - unsigned short new_count = count; - for (unsigned short i = 1; i < count; ++i) { - if (next.table [count - 1 - i]) { - new_count = count - i; - break; - } - } - xs_assert (new_count != count); - count = new_count; - - mtrie_t **old_table = next.table; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table, sizeof (mtrie_t*) * count); - free (old_table); - } - } - } - - return ret; -} - -void xs::mtrie_t::match (unsigned char *data_, size_t size_, - void (*func_) (pipe_t *pipe_, void *arg_), void *arg_) -{ - mtrie_t *current = this; - while (true) { - - // Signal the pipes attached to this node. - if (current->pipes) { - for (pipes_t::iterator it = current->pipes->begin (); - it != current->pipes->end (); ++it) - func_ (*it, arg_); - } - - // If we are at the end of the message, there's nothing more to match. - if (!size_) - break; - - // If there are no subnodes in the trie, return. - if (current->count == 0) - break; - - // If there's one subnode (optimisation). - if (current->count == 1) { - if (data_ [0] != current->min) - break; - current = current->next.node; - data_++; - size_--; - continue; - } - - // If there are multiple subnodes. - if (data_ [0] < current->min || data_ [0] >= - current->min + current->count) - break; - if (!current->next.table [data_ [0] - current->min]) - break; - current = current->next.table [data_ [0] - current->min]; - data_++; - size_--; - } -} - -bool xs::mtrie_t::is_redundant () const -{ - return !pipes && live_nodes == 0; -} - -- cgit v1.2.3