From d82cbb3a81f116cd22e9895ecac36ac3d7b38929 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 5 Apr 2012 07:32:58 +0200 Subject: XS_PLUGIN and XS_FILTER implementation This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik --- src/prefix_filter.cpp | 664 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 664 insertions(+) create mode 100644 src/prefix_filter.cpp (limited to 'src/prefix_filter.cpp') diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp new file mode 100644 index 0000000..5d21fb6 --- /dev/null +++ b/src/prefix_filter.cpp @@ -0,0 +1,664 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2011-2012 Spotify AB + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include + +#include "../include/xs.h" + +#include "prefix_filter.hpp" +#include "err.hpp" + +xs::prefix_filter_t::prefix_filter_t () +{ + init (&root); +} + +xs::prefix_filter_t::~prefix_filter_t () +{ + close (&root); +} + +int xs::prefix_filter_t::subscribe (void *core_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return add (&root, data_, size_, subscriber_) ? 1 : 0; +} + +int xs::prefix_filter_t::unsubscribe (void *core_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return rm (&root, data_, size_, subscriber_) ? 1 : 0; +} + +void xs::prefix_filter_t::unsubscribe_all (void *core_, void *subscriber_) +{ + rm (&root, subscriber_, core_); +} + +void xs::prefix_filter_t::enumerate (void *core_) +{ + unsigned char *buff = NULL; + list (&root, &buff, 0, 0, core_); + free (buff); +} + +int xs::prefix_filter_t::match (void *core_, + const unsigned char *data_, size_t size_) +{ + // This function is on critical path. It deliberately doesn't use + // recursion to get a bit better performance. + node_t *current = &root; + while (true) { + + // We've found a corresponding subscription! + if (current->subscribers) + return 1; + + // We've checked all the data and haven't found matching subscription. + if (!size_) + return 0; + + // If there's no corresponding slot for the first character + // of the prefix, the message does not match. + unsigned char c = *data_; + if (c < current->min || c >= current->min + current->count) + return 0; + + // Move to the next character. + if (current->count == 1) + current = current->next.node; + else { + current = current->next.table [c - current->min]; + if (!current) + return 0; + } + data_++; + size_--; + } + +} + +void xs::prefix_filter_t::match_all (void *core_, + const unsigned char *data_, size_t size_) +{ + node_t *current = &root; + while (true) { + + // Signal the subscribers attached to this node. + if (current->subscribers) { + for (node_t::subscribers_t::iterator it = + current->subscribers->begin (); + it != current->subscribers->end (); ++it) { + int rc = xs_filter_matching (core_, it->first); + errno_assert (rc == 0); + } + } + + // If we are at the end of the message, there's nothing more to match. + if (!size_) + break; + + // If there are no subnodes in the trie, return. + if (current->count == 0) + break; + + // If there's one subnode (optimisation). + if (current->count == 1) { + if (data_ [0] != current->min) + break; + current = current->next.node; + data_++; + size_--; + continue; + } + + // If there are multiple subnodes. + if (data_ [0] < current->min || data_ [0] >= + current->min + current->count) + break; + if (!current->next.table [data_ [0] - current->min]) + break; + current = current->next.table [data_ [0] - current->min]; + data_++; + size_--; + } +} + +void xs::prefix_filter_t::init (node_t *node_) +{ + node_->subscribers = NULL; + node_->min = 0; + node_->count = 0; + node_->live_nodes = 0; +} + +void xs::prefix_filter_t::close (node_t *node_) +{ + if (node_->subscribers) { + delete node_->subscribers; + node_->subscribers = NULL; + } + + if (node_->count == 1) { + xs_assert (node_->next.node); + close (node_->next.node); + delete node_->next.node; + node_->next.node = NULL; + } + else if (node_->count > 1) { + for (unsigned short i = 0; i != node_->count; ++i) + if (node_->next.table [i]) { + close (node_->next.table [i]); + delete node_->next.table [i]; + } + free (node_->next.table); + } +} + +bool xs::prefix_filter_t::add (node_t *node_, const unsigned char *prefix_, + size_t size_, void *subscriber_) +{ + // We are at the node corresponding to the prefix. We are done. + if (!size_) { + bool result = !node_->subscribers; + if (!node_->subscribers) + node_->subscribers = new (std::nothrow) node_t::subscribers_t; + node_t::subscribers_t::iterator it = node_->subscribers->insert ( + node_t::subscribers_t::value_type (subscriber_, 0)).first; + ++it->second; + return result; + } + + unsigned char c = *prefix_; + if (c < node_->min || c >= node_->min + node_->count) { + + // The character is out of range of currently handled + // charcters. We have to extend the table. + if (!node_->count) { + node_->min = c; + node_->count = 1; + node_->next.node = NULL; + } + else if (node_->count == 1) { + unsigned char oldc = node_->min; + node_t *oldp = node_->next.node; + node_->count = + (node_->min < c ? c - node_->min : node_->min - c) + 1; + node_->next.table = (node_t**) + malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + for (unsigned short i = 0; i != node_->count; ++i) + node_->next.table [i] = 0; + node_->min = std::min (node_->min, c); + node_->next.table [oldc - node_->min] = oldp; + } + else if (node_->min < c) { + + // The new character is above the current character range. + unsigned short old_count = node_->count; + node_->count = c - node_->min + 1; + node_->next.table = (node_t**) realloc ((void*) node_->next.table, + sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + for (unsigned short i = old_count; i != node_->count; i++) + node_->next.table [i] = NULL; + } + else { + + // The new character is below the current character range. + unsigned short old_count = node_->count; + node_->count = (node_->min + old_count) - c; + node_->next.table = (node_t**) realloc ((void*) node_->next.table, + sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + memmove (node_->next.table + node_->min - c, node_->next.table, + old_count * sizeof (node_t*)); + for (unsigned short i = 0; i != node_->min - c; i++) + node_->next.table [i] = NULL; + node_->min = c; + } + } + + // If next node does not exist, create one. + if (node_->count == 1) { + if (!node_->next.node) { + node_->next.node = new (std::nothrow) node_t; + alloc_assert (node_->next.node); + init (node_->next.node); + ++node_->live_nodes; + xs_assert (node_->next.node); + } + return add (node_->next.node, prefix_ + 1, size_ - 1, subscriber_); + } + else { + if (!node_->next.table [c - node_->min]) { + node_->next.table [c - node_->min] = new (std::nothrow) node_t; + alloc_assert (node_->next.table [c - node_->min]); + init (node_->next.table [c - node_->min]); + ++node_->live_nodes; + xs_assert (node_->next.table [c - node_->min]); + } + return add (node_->next.table [c - node_->min], prefix_ + 1, size_ - 1, + subscriber_); + } +} + + +void xs::prefix_filter_t::rm (node_t *node_, void *subscriber_, void *arg_) +{ + unsigned char *buff = NULL; + rm_helper (node_, subscriber_, &buff, 0, 0, arg_); + free (buff); +} + +void xs::prefix_filter_t::rm_helper (node_t *node_, void *subscribers_, + unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, void *arg_) +{ + // Remove the subscription from this node. + if (node_->subscribers) { + node_t::subscribers_t::iterator it = + node_->subscribers->find (subscribers_); + if (it != node_->subscribers->end ()) { + xs_assert (it->second); + --it->second; + if (!it->second) { + node_->subscribers->erase (it); + if (node_->subscribers->empty ()) { + int rc = xs_filter_unsubscribed (arg_, *buff_, buffsize_); + errno_assert (rc == 0); + delete node_->subscribers; + node_->subscribers = 0; + } + } + } + } + + // Adjust the buffer. + if (buffsize_ >= maxbuffsize_) { + maxbuffsize_ = buffsize_ + 256; + *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); + alloc_assert (*buff_); + } + + // If there are no subnodes in the trie, return. + if (node_->count == 0) + return; + + // If there's one subnode (optimisation). + if (node_->count == 1) { + (*buff_) [buffsize_] = node_->min; + buffsize_++; + rm_helper (node_->next.node, subscribers_, buff_, buffsize_, + maxbuffsize_, arg_); + + // Prune the node if it was made redundant by the removal + if (is_redundant (node_->next.node)) { + close (node_->next.node); + delete node_->next.node; + node_->next.node = 0; + node_->count = 0; + --node_->live_nodes; + xs_assert (node_->live_nodes == 0); + } + return; + } + + // If there are multiple subnodes. + // + // New min non-null character in the node table after the removal. + unsigned char new_min = node_->min; + // New max non-null character in the node table after the removal. + unsigned char new_max = node_->min + node_->count - 1; + for (unsigned short c = 0; c != node_->count; c++) { + (*buff_) [buffsize_] = node_->min + c; + if (node_->next.table [c]) { + rm_helper (node_->next.table [c], subscribers_, buff_, + buffsize_ + 1, maxbuffsize_, arg_); + + // Prune redudant nodes from the the trie. + if (is_redundant (node_->next.table [c])) { + close (node_->next.table [c]); + delete node_->next.table [c]; + node_->next.table [c] = 0; + + xs_assert (node_->live_nodes > 0); + --node_->live_nodes; + } + else { + if (c + node_->min > new_min) + new_min = c + node_->min; + if (c + node_->min < new_max) + new_max = c + node_->min; + } + } + } + + xs_assert (node_->count > 1); + + // Compact the node table if possible. + if (node_->live_nodes == 1) { + + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation. + xs_assert (new_min == new_max); + xs_assert (new_min >= node_->min && + new_min < node_->min + node_->count); + node_t *node = node_->next.table [new_min - node_->min]; + xs_assert (node); + free (node_->next.table); + node_->next.node = node; + node_->count = 1; + node_->min = new_min; + } + else if (node_->live_nodes > 1 && + (new_min > node_->min || new_max < node_->min + node_->count - 1)) { + xs_assert (new_max - new_min + 1 > 1); + + node_t **old_table = node_->next.table; + xs_assert (new_min > node_->min || + new_max < node_->min + node_->count - 1); + xs_assert (new_min >= node_->min); + xs_assert (new_max <= node_->min + node_->count - 1); + xs_assert (new_max - new_min + 1 < node_->count); + + node_->count = new_max - new_min + 1; + node_->next.table = (node_t**) malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + + memmove (node_->next.table, old_table + (new_min - node_->min), + sizeof (node_t*) * node_->count); + free (old_table); + + node_->min = new_min; + } +} + +bool xs::prefix_filter_t::rm (node_t *node_, const unsigned char *prefix_, + size_t size_, void *subscriber_) +{ + if (!size_) { + + // Remove the subscription from this node. + if (node_->subscribers) { + node_t::subscribers_t::iterator it = + node_->subscribers->find (subscriber_); + if (it != node_->subscribers->end ()) { + xs_assert (it->second); + --it->second; + if (!it->second) { + node_->subscribers->erase (it); + if (node_->subscribers->empty ()) { + delete node_->subscribers; + node_->subscribers = 0; + } + } + } + } + return !node_->subscribers; + } + + unsigned char c = *prefix_; + if (!node_->count || c < node_->min || c >= node_->min + node_->count) + return false; + + node_t *next_node = node_->count == 1 ? node_->next.node : + node_->next.table [c - node_->min]; + + if (!next_node) + return false; + + bool ret = rm (next_node, prefix_ + 1, size_ - 1, subscriber_); + + if (is_redundant (next_node)) { + close (next_node); + delete next_node; + xs_assert (node_->count > 0); + + if (node_->count == 1) { + node_->next.node = 0; + node_->count = 0; + --node_->live_nodes; + xs_assert (node_->live_nodes == 0); + } + else { + node_->next.table [c - node_->min] = 0; + xs_assert (node_->live_nodes > 1); + --node_->live_nodes; + + // Compact the table if possible. + if (node_->live_nodes == 1) { + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + node_t *node = 0; + for (unsigned short i = 0; i < node_->count; ++i) { + if (node_->next.table [i]) { + node = node_->next.table [i]; + node_->min += i; + break; + } + } + + xs_assert (node); + free (node_->next.table); + node_->next.node = node; + node_->count = 1; + } + else if (c == node_->min) { + + // We can compact the table "from the left". + unsigned char new_min = node_->min; + for (unsigned short i = 1; i < node_->count; ++i) { + if (node_->next.table [i]) { + new_min = i + node_->min; + break; + } + } + xs_assert (new_min != node_->min); + + node_t **old_table = node_->next.table; + xs_assert (new_min > node_->min); + xs_assert (node_->count > new_min - node_->min); + + node_->count = node_->count - (new_min - node_->min); + node_->next.table = + (node_t**) malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + + memmove (node_->next.table, old_table + (new_min - node_->min), + sizeof (node_t*) * node_->count); + free (old_table); + + node_->min = new_min; + } + else if (c == node_->min + node_->count - 1) { + + // We can compact the table "from the right". + unsigned short new_count = node_->count; + for (unsigned short i = 1; i < node_->count; ++i) { + if (node_->next.table [node_->count - 1 - i]) { + new_count = node_->count - i; + break; + } + } + xs_assert (new_count != node_->count); + node_->count = new_count; + + node_t **old_table = node_->next.table; + node_->next.table = + (node_t**) malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + + memmove (node_->next.table, old_table, + sizeof (node_t*) * node_->count); + free (old_table); + } + } + } + + return ret; +} + +void xs::prefix_filter_t::list (node_t *node_, unsigned char **buff_, + size_t buffsize_, size_t maxbuffsize_, void *arg_) +{ + // If this node is a subscription, apply the function. + if (node_->subscribers) { + int rc = xs_filter_subscribed (arg_, *buff_, buffsize_); + errno_assert (rc == 0); + } + + // Adjust the buffer. + if (buffsize_ >= maxbuffsize_) { + maxbuffsize_ = buffsize_ + 256; + *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); + xs_assert (*buff_); + } + + // If there are no subnodes in the trie, return. + if (node_->count == 0) + return; + + // If there's one subnode (optimisation). + if (node_->count == 1) { + (*buff_) [buffsize_] = node_->min; + buffsize_++; + list (node_->next.node, buff_, buffsize_, maxbuffsize_, arg_); + return; + } + + // If there are multiple subnodes. + for (unsigned short c = 0; c != node_->count; c++) { + (*buff_) [buffsize_] = node_->min + c; + if (node_->next.table [c]) + list (node_->next.table [c], buff_, buffsize_ + 1, maxbuffsize_, + arg_); + } +} + +bool xs::prefix_filter_t::is_redundant (node_t *node_) +{ + return !node_->subscribers && node_->live_nodes == 0; +} + +// Implementation of the C interface of the filter. +// Following functions convert raw C calls into calls to C++ object methods. + +static int id (void *core_) +{ + return XS_FILTER_PREFIX; +} + +static void *pf_create (void *core_) +{ + void *pf = (void*) new (std::nothrow) xs::prefix_filter_t; + alloc_assert (pf); + return pf; +} + +static void pf_destroy (void *core_, void *pf_) +{ + delete (xs::prefix_filter_t*) pf_; +} + +static int pf_subscribe (void *core_, void *pf_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) pf_)->subscribe (core_, subscriber_, + data_, size_); +} + +static int pf_unsubscribe (void *core_, void *pf_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) pf_)->unsubscribe (core_, subscriber_, + data_, size_); +} + +static void pf_unsubscribe_all (void *core_, void *pf_, void *subscriber_) +{ + ((xs::prefix_filter_t*) pf_)->unsubscribe_all (core_, subscriber_); +} + +static void pf_match (void *core_, void *pf_, + const unsigned char *data_, size_t size_) +{ + ((xs::prefix_filter_t*) pf_)->match_all (core_, data_, size_); +} + +static void *sf_create (void *core_) +{ + void *sf = (void*) new (std::nothrow) xs::prefix_filter_t; + alloc_assert (sf); + return sf; +} + +static void sf_destroy (void *core_, void *sf_) +{ + delete (xs::prefix_filter_t*) sf_; +} + +static int sf_subscribe (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) sf_)->subscribe (core_, NULL, + data_, size_); +} + +static int sf_unsubscribe (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) sf_)->unsubscribe (core_, NULL, + data_, size_); +} + +static void sf_enumerate (void *core_, void *sf_) +{ + ((xs::prefix_filter_t*) sf_)->enumerate (core_); +} + +static int sf_match (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) sf_)->match (core_, data_, size_); +} + +static xs_filter_t filter = { + XS_PLUGIN_FILTER, + 1, + id, + pf_create, + pf_destroy, + pf_subscribe, + pf_unsubscribe, + pf_unsubscribe_all, + pf_match, + sf_create, + sf_destroy, + sf_subscribe, + sf_unsubscribe, + sf_enumerate, + sf_match, +}; + +void *xs::prefix_filter = (void*) &filter; + -- cgit v1.2.3