summaryrefslogtreecommitdiff
path: root/src/prefix_filter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/prefix_filter.cpp')
-rw-r--r--src/prefix_filter.cpp664
1 files changed, 664 insertions, 0 deletions
diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp
new file mode 100644
index 0000000..5d21fb6
--- /dev/null
+++ b/src/prefix_filter.cpp
@@ -0,0 +1,664 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2011-2012 Spotify AB
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <new>
+
+#include "../include/xs.h"
+
+#include "prefix_filter.hpp"
+#include "err.hpp"
+
+xs::prefix_filter_t::prefix_filter_t ()
+{
+ init (&root);
+}
+
+xs::prefix_filter_t::~prefix_filter_t ()
+{
+ close (&root);
+}
+
+int xs::prefix_filter_t::subscribe (void *core_, void *subscriber_,
+ const unsigned char *data_, size_t size_)
+{
+ return add (&root, data_, size_, subscriber_) ? 1 : 0;
+}
+
+int xs::prefix_filter_t::unsubscribe (void *core_, void *subscriber_,
+ const unsigned char *data_, size_t size_)
+{
+ return rm (&root, data_, size_, subscriber_) ? 1 : 0;
+}
+
+void xs::prefix_filter_t::unsubscribe_all (void *core_, void *subscriber_)
+{
+ rm (&root, subscriber_, core_);
+}
+
+void xs::prefix_filter_t::enumerate (void *core_)
+{
+ unsigned char *buff = NULL;
+ list (&root, &buff, 0, 0, core_);
+ free (buff);
+}
+
+int xs::prefix_filter_t::match (void *core_,
+ const unsigned char *data_, size_t size_)
+{
+ // This function is on critical path. It deliberately doesn't use
+ // recursion to get a bit better performance.
+ node_t *current = &root;
+ while (true) {
+
+ // We've found a corresponding subscription!
+ if (current->subscribers)
+ return 1;
+
+ // We've checked all the data and haven't found matching subscription.
+ if (!size_)
+ return 0;
+
+ // If there's no corresponding slot for the first character
+ // of the prefix, the message does not match.
+ unsigned char c = *data_;
+ if (c < current->min || c >= current->min + current->count)
+ return 0;
+
+ // Move to the next character.
+ if (current->count == 1)
+ current = current->next.node;
+ else {
+ current = current->next.table [c - current->min];
+ if (!current)
+ return 0;
+ }
+ data_++;
+ size_--;
+ }
+
+}
+
+void xs::prefix_filter_t::match_all (void *core_,
+ const unsigned char *data_, size_t size_)
+{
+ node_t *current = &root;
+ while (true) {
+
+ // Signal the subscribers attached to this node.
+ if (current->subscribers) {
+ for (node_t::subscribers_t::iterator it =
+ current->subscribers->begin ();
+ it != current->subscribers->end (); ++it) {
+ int rc = xs_filter_matching (core_, it->first);
+ errno_assert (rc == 0);
+ }
+ }
+
+ // If we are at the end of the message, there's nothing more to match.
+ if (!size_)
+ break;
+
+ // If there are no subnodes in the trie, return.
+ if (current->count == 0)
+ break;
+
+ // If there's one subnode (optimisation).
+ if (current->count == 1) {
+ if (data_ [0] != current->min)
+ break;
+ current = current->next.node;
+ data_++;
+ size_--;
+ continue;
+ }
+
+ // If there are multiple subnodes.
+ if (data_ [0] < current->min || data_ [0] >=
+ current->min + current->count)
+ break;
+ if (!current->next.table [data_ [0] - current->min])
+ break;
+ current = current->next.table [data_ [0] - current->min];
+ data_++;
+ size_--;
+ }
+}
+
+void xs::prefix_filter_t::init (node_t *node_)
+{
+ node_->subscribers = NULL;
+ node_->min = 0;
+ node_->count = 0;
+ node_->live_nodes = 0;
+}
+
+void xs::prefix_filter_t::close (node_t *node_)
+{
+ if (node_->subscribers) {
+ delete node_->subscribers;
+ node_->subscribers = NULL;
+ }
+
+ if (node_->count == 1) {
+ xs_assert (node_->next.node);
+ close (node_->next.node);
+ delete node_->next.node;
+ node_->next.node = NULL;
+ }
+ else if (node_->count > 1) {
+ for (unsigned short i = 0; i != node_->count; ++i)
+ if (node_->next.table [i]) {
+ close (node_->next.table [i]);
+ delete node_->next.table [i];
+ }
+ free (node_->next.table);
+ }
+}
+
+bool xs::prefix_filter_t::add (node_t *node_, const unsigned char *prefix_,
+ size_t size_, void *subscriber_)
+{
+ // We are at the node corresponding to the prefix. We are done.
+ if (!size_) {
+ bool result = !node_->subscribers;
+ if (!node_->subscribers)
+ node_->subscribers = new (std::nothrow) node_t::subscribers_t;
+ node_t::subscribers_t::iterator it = node_->subscribers->insert (
+ node_t::subscribers_t::value_type (subscriber_, 0)).first;
+ ++it->second;
+ return result;
+ }
+
+ unsigned char c = *prefix_;
+ if (c < node_->min || c >= node_->min + node_->count) {
+
+ // The character is out of range of currently handled
+ // charcters. We have to extend the table.
+ if (!node_->count) {
+ node_->min = c;
+ node_->count = 1;
+ node_->next.node = NULL;
+ }
+ else if (node_->count == 1) {
+ unsigned char oldc = node_->min;
+ node_t *oldp = node_->next.node;
+ node_->count =
+ (node_->min < c ? c - node_->min : node_->min - c) + 1;
+ node_->next.table = (node_t**)
+ malloc (sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+ for (unsigned short i = 0; i != node_->count; ++i)
+ node_->next.table [i] = 0;
+ node_->min = std::min (node_->min, c);
+ node_->next.table [oldc - node_->min] = oldp;
+ }
+ else if (node_->min < c) {
+
+ // The new character is above the current character range.
+ unsigned short old_count = node_->count;
+ node_->count = c - node_->min + 1;
+ node_->next.table = (node_t**) realloc ((void*) node_->next.table,
+ sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+ for (unsigned short i = old_count; i != node_->count; i++)
+ node_->next.table [i] = NULL;
+ }
+ else {
+
+ // The new character is below the current character range.
+ unsigned short old_count = node_->count;
+ node_->count = (node_->min + old_count) - c;
+ node_->next.table = (node_t**) realloc ((void*) node_->next.table,
+ sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+ memmove (node_->next.table + node_->min - c, node_->next.table,
+ old_count * sizeof (node_t*));
+ for (unsigned short i = 0; i != node_->min - c; i++)
+ node_->next.table [i] = NULL;
+ node_->min = c;
+ }
+ }
+
+ // If next node does not exist, create one.
+ if (node_->count == 1) {
+ if (!node_->next.node) {
+ node_->next.node = new (std::nothrow) node_t;
+ alloc_assert (node_->next.node);
+ init (node_->next.node);
+ ++node_->live_nodes;
+ xs_assert (node_->next.node);
+ }
+ return add (node_->next.node, prefix_ + 1, size_ - 1, subscriber_);
+ }
+ else {
+ if (!node_->next.table [c - node_->min]) {
+ node_->next.table [c - node_->min] = new (std::nothrow) node_t;
+ alloc_assert (node_->next.table [c - node_->min]);
+ init (node_->next.table [c - node_->min]);
+ ++node_->live_nodes;
+ xs_assert (node_->next.table [c - node_->min]);
+ }
+ return add (node_->next.table [c - node_->min], prefix_ + 1, size_ - 1,
+ subscriber_);
+ }
+}
+
+
+void xs::prefix_filter_t::rm (node_t *node_, void *subscriber_, void *arg_)
+{
+ unsigned char *buff = NULL;
+ rm_helper (node_, subscriber_, &buff, 0, 0, arg_);
+ free (buff);
+}
+
+void xs::prefix_filter_t::rm_helper (node_t *node_, void *subscribers_,
+ unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, void *arg_)
+{
+ // Remove the subscription from this node.
+ if (node_->subscribers) {
+ node_t::subscribers_t::iterator it =
+ node_->subscribers->find (subscribers_);
+ if (it != node_->subscribers->end ()) {
+ xs_assert (it->second);
+ --it->second;
+ if (!it->second) {
+ node_->subscribers->erase (it);
+ if (node_->subscribers->empty ()) {
+ int rc = xs_filter_unsubscribed (arg_, *buff_, buffsize_);
+ errno_assert (rc == 0);
+ delete node_->subscribers;
+ node_->subscribers = 0;
+ }
+ }
+ }
+ }
+
+ // Adjust the buffer.
+ if (buffsize_ >= maxbuffsize_) {
+ maxbuffsize_ = buffsize_ + 256;
+ *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
+ alloc_assert (*buff_);
+ }
+
+ // If there are no subnodes in the trie, return.
+ if (node_->count == 0)
+ return;
+
+ // If there's one subnode (optimisation).
+ if (node_->count == 1) {
+ (*buff_) [buffsize_] = node_->min;
+ buffsize_++;
+ rm_helper (node_->next.node, subscribers_, buff_, buffsize_,
+ maxbuffsize_, arg_);
+
+ // Prune the node if it was made redundant by the removal
+ if (is_redundant (node_->next.node)) {
+ close (node_->next.node);
+ delete node_->next.node;
+ node_->next.node = 0;
+ node_->count = 0;
+ --node_->live_nodes;
+ xs_assert (node_->live_nodes == 0);
+ }
+ return;
+ }
+
+ // If there are multiple subnodes.
+ //
+ // New min non-null character in the node table after the removal.
+ unsigned char new_min = node_->min;
+ // New max non-null character in the node table after the removal.
+ unsigned char new_max = node_->min + node_->count - 1;
+ for (unsigned short c = 0; c != node_->count; c++) {
+ (*buff_) [buffsize_] = node_->min + c;
+ if (node_->next.table [c]) {
+ rm_helper (node_->next.table [c], subscribers_, buff_,
+ buffsize_ + 1, maxbuffsize_, arg_);
+
+ // Prune redudant nodes from the the trie.
+ if (is_redundant (node_->next.table [c])) {
+ close (node_->next.table [c]);
+ delete node_->next.table [c];
+ node_->next.table [c] = 0;
+
+ xs_assert (node_->live_nodes > 0);
+ --node_->live_nodes;
+ }
+ else {
+ if (c + node_->min > new_min)
+ new_min = c + node_->min;
+ if (c + node_->min < new_max)
+ new_max = c + node_->min;
+ }
+ }
+ }
+
+ xs_assert (node_->count > 1);
+
+ // Compact the node table if possible.
+ if (node_->live_nodes == 1) {
+
+ // If there's only one live node in the table we can
+ // switch to using the more compact single-node
+ // representation.
+ xs_assert (new_min == new_max);
+ xs_assert (new_min >= node_->min &&
+ new_min < node_->min + node_->count);
+ node_t *node = node_->next.table [new_min - node_->min];
+ xs_assert (node);
+ free (node_->next.table);
+ node_->next.node = node;
+ node_->count = 1;
+ node_->min = new_min;
+ }
+ else if (node_->live_nodes > 1 &&
+ (new_min > node_->min || new_max < node_->min + node_->count - 1)) {
+ xs_assert (new_max - new_min + 1 > 1);
+
+ node_t **old_table = node_->next.table;
+ xs_assert (new_min > node_->min ||
+ new_max < node_->min + node_->count - 1);
+ xs_assert (new_min >= node_->min);
+ xs_assert (new_max <= node_->min + node_->count - 1);
+ xs_assert (new_max - new_min + 1 < node_->count);
+
+ node_->count = new_max - new_min + 1;
+ node_->next.table = (node_t**) malloc (sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+
+ memmove (node_->next.table, old_table + (new_min - node_->min),
+ sizeof (node_t*) * node_->count);
+ free (old_table);
+
+ node_->min = new_min;
+ }
+}
+
+bool xs::prefix_filter_t::rm (node_t *node_, const unsigned char *prefix_,
+ size_t size_, void *subscriber_)
+{
+ if (!size_) {
+
+ // Remove the subscription from this node.
+ if (node_->subscribers) {
+ node_t::subscribers_t::iterator it =
+ node_->subscribers->find (subscriber_);
+ if (it != node_->subscribers->end ()) {
+ xs_assert (it->second);
+ --it->second;
+ if (!it->second) {
+ node_->subscribers->erase (it);
+ if (node_->subscribers->empty ()) {
+ delete node_->subscribers;
+ node_->subscribers = 0;
+ }
+ }
+ }
+ }
+ return !node_->subscribers;
+ }
+
+ unsigned char c = *prefix_;
+ if (!node_->count || c < node_->min || c >= node_->min + node_->count)
+ return false;
+
+ node_t *next_node = node_->count == 1 ? node_->next.node :
+ node_->next.table [c - node_->min];
+
+ if (!next_node)
+ return false;
+
+ bool ret = rm (next_node, prefix_ + 1, size_ - 1, subscriber_);
+
+ if (is_redundant (next_node)) {
+ close (next_node);
+ delete next_node;
+ xs_assert (node_->count > 0);
+
+ if (node_->count == 1) {
+ node_->next.node = 0;
+ node_->count = 0;
+ --node_->live_nodes;
+ xs_assert (node_->live_nodes == 0);
+ }
+ else {
+ node_->next.table [c - node_->min] = 0;
+ xs_assert (node_->live_nodes > 1);
+ --node_->live_nodes;
+
+ // Compact the table if possible.
+ if (node_->live_nodes == 1) {
+ // If there's only one live node in the table we can
+ // switch to using the more compact single-node
+ // representation
+ node_t *node = 0;
+ for (unsigned short i = 0; i < node_->count; ++i) {
+ if (node_->next.table [i]) {
+ node = node_->next.table [i];
+ node_->min += i;
+ break;
+ }
+ }
+
+ xs_assert (node);
+ free (node_->next.table);
+ node_->next.node = node;
+ node_->count = 1;
+ }
+ else if (c == node_->min) {
+
+ // We can compact the table "from the left".
+ unsigned char new_min = node_->min;
+ for (unsigned short i = 1; i < node_->count; ++i) {
+ if (node_->next.table [i]) {
+ new_min = i + node_->min;
+ break;
+ }
+ }
+ xs_assert (new_min != node_->min);
+
+ node_t **old_table = node_->next.table;
+ xs_assert (new_min > node_->min);
+ xs_assert (node_->count > new_min - node_->min);
+
+ node_->count = node_->count - (new_min - node_->min);
+ node_->next.table =
+ (node_t**) malloc (sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+
+ memmove (node_->next.table, old_table + (new_min - node_->min),
+ sizeof (node_t*) * node_->count);
+ free (old_table);
+
+ node_->min = new_min;
+ }
+ else if (c == node_->min + node_->count - 1) {
+
+ // We can compact the table "from the right".
+ unsigned short new_count = node_->count;
+ for (unsigned short i = 1; i < node_->count; ++i) {
+ if (node_->next.table [node_->count - 1 - i]) {
+ new_count = node_->count - i;
+ break;
+ }
+ }
+ xs_assert (new_count != node_->count);
+ node_->count = new_count;
+
+ node_t **old_table = node_->next.table;
+ node_->next.table =
+ (node_t**) malloc (sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+
+ memmove (node_->next.table, old_table,
+ sizeof (node_t*) * node_->count);
+ free (old_table);
+ }
+ }
+ }
+
+ return ret;
+}
+
+void xs::prefix_filter_t::list (node_t *node_, unsigned char **buff_,
+ size_t buffsize_, size_t maxbuffsize_, void *arg_)
+{
+ // If this node is a subscription, apply the function.
+ if (node_->subscribers) {
+ int rc = xs_filter_subscribed (arg_, *buff_, buffsize_);
+ errno_assert (rc == 0);
+ }
+
+ // Adjust the buffer.
+ if (buffsize_ >= maxbuffsize_) {
+ maxbuffsize_ = buffsize_ + 256;
+ *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
+ xs_assert (*buff_);
+ }
+
+ // If there are no subnodes in the trie, return.
+ if (node_->count == 0)
+ return;
+
+ // If there's one subnode (optimisation).
+ if (node_->count == 1) {
+ (*buff_) [buffsize_] = node_->min;
+ buffsize_++;
+ list (node_->next.node, buff_, buffsize_, maxbuffsize_, arg_);
+ return;
+ }
+
+ // If there are multiple subnodes.
+ for (unsigned short c = 0; c != node_->count; c++) {
+ (*buff_) [buffsize_] = node_->min + c;
+ if (node_->next.table [c])
+ list (node_->next.table [c], buff_, buffsize_ + 1, maxbuffsize_,
+ arg_);
+ }
+}
+
+bool xs::prefix_filter_t::is_redundant (node_t *node_)
+{
+ return !node_->subscribers && node_->live_nodes == 0;
+}
+
+// Implementation of the C interface of the filter.
+// Following functions convert raw C calls into calls to C++ object methods.
+
+static int id (void *core_)
+{
+ return XS_FILTER_PREFIX;
+}
+
+static void *pf_create (void *core_)
+{
+ void *pf = (void*) new (std::nothrow) xs::prefix_filter_t;
+ alloc_assert (pf);
+ return pf;
+}
+
+static void pf_destroy (void *core_, void *pf_)
+{
+ delete (xs::prefix_filter_t*) pf_;
+}
+
+static int pf_subscribe (void *core_, void *pf_, void *subscriber_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::prefix_filter_t*) pf_)->subscribe (core_, subscriber_,
+ data_, size_);
+}
+
+static int pf_unsubscribe (void *core_, void *pf_, void *subscriber_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::prefix_filter_t*) pf_)->unsubscribe (core_, subscriber_,
+ data_, size_);
+}
+
+static void pf_unsubscribe_all (void *core_, void *pf_, void *subscriber_)
+{
+ ((xs::prefix_filter_t*) pf_)->unsubscribe_all (core_, subscriber_);
+}
+
+static void pf_match (void *core_, void *pf_,
+ const unsigned char *data_, size_t size_)
+{
+ ((xs::prefix_filter_t*) pf_)->match_all (core_, data_, size_);
+}
+
+static void *sf_create (void *core_)
+{
+ void *sf = (void*) new (std::nothrow) xs::prefix_filter_t;
+ alloc_assert (sf);
+ return sf;
+}
+
+static void sf_destroy (void *core_, void *sf_)
+{
+ delete (xs::prefix_filter_t*) sf_;
+}
+
+static int sf_subscribe (void *core_, void *sf_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::prefix_filter_t*) sf_)->subscribe (core_, NULL,
+ data_, size_);
+}
+
+static int sf_unsubscribe (void *core_, void *sf_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::prefix_filter_t*) sf_)->unsubscribe (core_, NULL,
+ data_, size_);
+}
+
+static void sf_enumerate (void *core_, void *sf_)
+{
+ ((xs::prefix_filter_t*) sf_)->enumerate (core_);
+}
+
+static int sf_match (void *core_, void *sf_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::prefix_filter_t*) sf_)->match (core_, data_, size_);
+}
+
+static xs_filter_t filter = {
+ XS_PLUGIN_FILTER,
+ 1,
+ id,
+ pf_create,
+ pf_destroy,
+ pf_subscribe,
+ pf_unsubscribe,
+ pf_unsubscribe_all,
+ pf_match,
+ sf_create,
+ sf_destroy,
+ sf_subscribe,
+ sf_unsubscribe,
+ sf_enumerate,
+ sf_match,
+};
+
+void *xs::prefix_filter = (void*) &filter;
+