/*
Copyright (c) 2012 250bpm s.r.o.
Copyright (c) 2011-2012 Spotify AB
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
This file is part of Crossroads I/O project.
Crossroads I/O is free software; you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
Crossroads is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
*/
#include
#include "../include/xs.h"
#include "prefix_filter.hpp"
#include "err.hpp"
xs::prefix_filter_t::prefix_filter_t ()
{
init (&root);
}
xs::prefix_filter_t::~prefix_filter_t ()
{
close (&root);
}
int xs::prefix_filter_t::subscribe (void *core_, void *subscriber_,
const unsigned char *data_, size_t size_)
{
return add (&root, data_, size_, subscriber_) ? 1 : 0;
}
int xs::prefix_filter_t::unsubscribe (void *core_, void *subscriber_,
const unsigned char *data_, size_t size_)
{
return rm (&root, data_, size_, subscriber_) ? 1 : 0;
}
void xs::prefix_filter_t::unsubscribe_all (void *core_, void *subscriber_)
{
rm (&root, subscriber_, core_);
}
void xs::prefix_filter_t::enumerate (void *core_)
{
unsigned char *buff = NULL;
list (&root, &buff, 0, 0, core_);
free (buff);
}
int xs::prefix_filter_t::match (void *core_,
const unsigned char *data_, size_t size_)
{
// This function is on critical path. It deliberately doesn't use
// recursion to get a bit better performance.
node_t *current = &root;
while (true) {
// We've found a corresponding subscription!
if (current->subscribers)
return 1;
// We've checked all the data and haven't found matching subscription.
if (!size_)
return 0;
// If there's no corresponding slot for the first character
// of the prefix, the message does not match.
unsigned char c = *data_;
if (c < current->min || c >= current->min + current->count)
return 0;
// Move to the next character.
if (current->count == 1)
current = current->next.node;
else {
current = current->next.table [c - current->min];
if (!current)
return 0;
}
data_++;
size_--;
}
}
void xs::prefix_filter_t::match_all (void *core_,
const unsigned char *data_, size_t size_)
{
node_t *current = &root;
while (true) {
// Signal the subscribers attached to this node.
if (current->subscribers) {
for (node_t::subscribers_t::iterator it =
current->subscribers->begin ();
it != current->subscribers->end (); ++it) {
int rc = xs_filter_matching (core_, it->first);
errno_assert (rc == 0);
}
}
// If we are at the end of the message, there's nothing more to match.
if (!size_)
break;
// If there are no subnodes in the trie, return.
if (current->count == 0)
break;
// If there's one subnode (optimisation).
if (current->count == 1) {
if (data_ [0] != current->min)
break;
current = current->next.node;
data_++;
size_--;
continue;
}
// If there are multiple subnodes.
if (data_ [0] < current->min || data_ [0] >=
current->min + current->count)
break;
if (!current->next.table [data_ [0] - current->min])
break;
current = current->next.table [data_ [0] - current->min];
data_++;
size_--;
}
}
void xs::prefix_filter_t::init (node_t *node_)
{
node_->subscribers = NULL;
node_->min = 0;
node_->count = 0;
node_->live_nodes = 0;
}
void xs::prefix_filter_t::close (node_t *node_)
{
if (node_->subscribers) {
delete node_->subscribers;
node_->subscribers = NULL;
}
if (node_->count == 1) {
xs_assert (node_->next.node);
close (node_->next.node);
delete node_->next.node;
node_->next.node = NULL;
}
else if (node_->count > 1) {
for (unsigned short i = 0; i != node_->count; ++i)
if (node_->next.table [i]) {
close (node_->next.table [i]);
delete node_->next.table [i];
}
free (node_->next.table);
}
}
bool xs::prefix_filter_t::add (node_t *node_, const unsigned char *prefix_,
size_t size_, void *subscriber_)
{
// We are at the node corresponding to the prefix. We are done.
if (!size_) {
bool result = !node_->subscribers;
if (!node_->subscribers)
node_->subscribers = new (std::nothrow) node_t::subscribers_t;
node_t::subscribers_t::iterator it = node_->subscribers->insert (
node_t::subscribers_t::value_type (subscriber_, 0)).first;
++it->second;
return result;
}
unsigned char c = *prefix_;
if (c < node_->min || c >= node_->min + node_->count) {
// The character is out of range of currently handled
// charcters. We have to extend the table.
if (!node_->count) {
node_->min = c;
node_->count = 1;
node_->next.node = NULL;
}
else if (node_->count == 1) {
unsigned char oldc = node_->min;
node_t *oldp = node_->next.node;
node_->count =
(node_->min < c ? c - node_->min : node_->min - c) + 1;
node_->next.table = (node_t**)
malloc (sizeof (node_t*) * node_->count);
xs_assert (node_->next.table);
for (unsigned short i = 0; i != node_->count; ++i)
node_->next.table [i] = 0;
node_->min = std::min (node_->min, c);
node_->next.table [oldc - node_->min] = oldp;
}
else if (node_->min < c) {
// The new character is above the current character range.
unsigned short old_count = node_->count;
node_->count = c - node_->min + 1;
node_->next.table = (node_t**) realloc ((void*) node_->next.table,
sizeof (node_t*) * node_->count);
xs_assert (node_->next.table);
for (unsigned short i = old_count; i != node_->count; i++)
node_->next.table [i] = NULL;
}
else {
// The new character is below the current character range.
unsigned short old_count = node_->count;
node_->count = (node_->min + old_count) - c;
node_->next.table = (node_t**) realloc ((void*) node_->next.table,
sizeof (node_t*) * node_->count);
xs_assert (node_->next.table);
memmove (node_->next.table + node_->min - c, node_->next.table,
old_count * sizeof (node_t*));
for (unsigned short i = 0; i != node_->min - c; i++)
node_->next.table [i] = NULL;
node_->min = c;
}
}
// If next node does not exist, create one.
if (node_->count == 1) {
if (!node_->next.node) {
node_->next.node = new (std::nothrow) node_t;
alloc_assert (node_->next.node);
init (node_->next.node);
++node_->live_nodes;
xs_assert (node_->next.node);
}
return add (node_->next.node, prefix_ + 1, size_ - 1, subscriber_);
}
else {
if (!node_->next.table [c - node_->min]) {
node_->next.table [c - node_->min] = new (std::nothrow) node_t;
alloc_assert (node_->next.table [c - node_->min]);
init (node_->next.table [c - node_->min]);
++node_->live_nodes;
xs_assert (node_->next.table [c - node_->min]);
}
return add (node_->next.table [c - node_->min], prefix_ + 1, size_ - 1,
subscriber_);
}
}
void xs::prefix_filter_t::rm (node_t *node_, void *subscriber_, void *arg_)
{
unsigned char *buff = NULL;
rm_helper (node_, subscriber_, &buff, 0, 0, arg_);
free (buff);
}
void xs::prefix_filter_t::rm_helper (node_t *node_, void *subscribers_,
unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, void *arg_)
{
// Remove the subscription from this node.
if (node_->subscribers) {
node_t::subscribers_t::iterator it =
node_->subscribers->find (subscribers_);
if (it != node_->subscribers->end ()) {
xs_assert (it->second);
--it->second;
if (!it->second) {
node_->subscribers->erase (it);
if (node_->subscribers->empty ()) {
int rc = xs_filter_unsubscribed (arg_, *buff_, buffsize_);
errno_assert (rc == 0);
delete node_->subscribers;
node_->subscribers = 0;
}
}
}
}
// Adjust the buffer.
if (buffsize_ >= maxbuffsize_) {
maxbuffsize_ = buffsize_ + 256;
*buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
alloc_assert (*buff_);
}
// If there are no subnodes in the trie, return.
if (node_->count == 0)
return;
// If there's one subnode (optimisation).
if (node_->count == 1) {
(*buff_) [buffsize_] = node_->min;
buffsize_++;
rm_helper (node_->next.node, subscribers_, buff_, buffsize_,
maxbuffsize_, arg_);
// Prune the node if it was made redundant by the removal
if (is_redundant (node_->next.node)) {
close (node_->next.node);
delete node_->next.node;
node_->next.node = 0;
node_->count = 0;
--node_->live_nodes;
xs_assert (node_->live_nodes == 0);
}
return;
}
// If there are multiple subnodes.
//
// New min non-null character in the node table after the removal.
unsigned char new_min = node_->min;
// New max non-null character in the node table after the removal.
unsigned char new_max = node_->min + node_->count - 1;
for (unsigned short c = 0; c != node_->count; c++) {
(*buff_) [buffsize_] = node_->min + c;
if (node_->next.table [c]) {
rm_helper (node_->next.table [c], subscribers_, buff_,
buffsize_ + 1, maxbuffsize_, arg_);
// Prune redudant nodes from the the trie.
if (is_redundant (node_->next.table [c])) {
close (node_->next.table [c]);
delete node_->next.table [c];
node_->next.table [c] = 0;
xs_assert (node_->live_nodes > 0);
--node_->live_nodes;
}
else {
if (c + node_->min > new_min)
new_min = c + node_->min;
if (c + node_->min < new_max)
new_max = c + node_->min;
}
}
}
xs_assert (node_->count > 1);
// Compact the node table if possible.
if (node_->live_nodes == 1) {
// If there's only one live node in the table we can
// switch to using the more compact single-node
// representation.
xs_assert (new_min == new_max);
xs_assert (new_min >= node_->min &&
new_min < node_->min + node_->count);
node_t *node = node_->next.table [new_min - node_->min];
xs_assert (node);
free (node_->next.table);
node_->next.node = node;
node_->count = 1;
node_->min = new_min;
}
else if (node_->live_nodes > 1 &&
(new_min > node_->min || new_max < node_->min + node_->count - 1)) {
xs_assert (new_max - new_min + 1 > 1);
node_t **old_table = node_->next.table;
xs_assert (new_min > node_->min ||
new_max < node_->min + node_->count - 1);
xs_assert (new_min >= node_->min);
xs_assert (new_max <= node_->min + node_->count - 1);
xs_assert (new_max - new_min + 1 < node_->count);
node_->count = new_max - new_min + 1;
node_->next.table = (node_t**) malloc (sizeof (node_t*) * node_->count);
xs_assert (node_->next.table);
memmove (node_->next.table, old_table + (new_min - node_->min),
sizeof (node_t*) * node_->count);
free (old_table);
node_->min = new_min;
}
}
bool xs::prefix_filter_t::rm (node_t *node_, const unsigned char *prefix_,
size_t size_, void *subscriber_)
{
if (!size_) {
// Remove the subscription from this node.
if (node_->subscribers) {
node_t::subscribers_t::iterator it =
node_->subscribers->find (subscriber_);
if (it != node_->subscribers->end ()) {
xs_assert (it->second);
--it->second;
if (!it->second) {
node_->subscribers->erase (it);
if (node_->subscribers->empty ()) {
delete node_->subscribers;
node_->subscribers = 0;
}
}
}
}
return !node_->subscribers;
}
unsigned char c = *prefix_;
if (!node_->count || c < node_->min || c >= node_->min + node_->count)
return false;
node_t *next_node = node_->count == 1 ? node_->next.node :
node_->next.table [c - node_->min];
if (!next_node)
return false;
bool ret = rm (next_node, prefix_ + 1, size_ - 1, subscriber_);
if (is_redundant (next_node)) {
close (next_node);
delete next_node;
xs_assert (node_->count > 0);
if (node_->count == 1) {
node_->next.node = 0;
node_->count = 0;
--node_->live_nodes;
xs_assert (node_->live_nodes == 0);
}
else {
node_->next.table [c - node_->min] = 0;
xs_assert (node_->live_nodes > 1);
--node_->live_nodes;
// Compact the table if possible.
if (node_->live_nodes == 1) {
// If there's only one live node in the table we can
// switch to using the more compact single-node
// representation
node_t *node = 0;
for (unsigned short i = 0; i < node_->count; ++i) {
if (node_->next.table [i]) {
node = node_->next.table [i];
node_->min += i;
break;
}
}
xs_assert (node);
free (node_->next.table);
node_->next.node = node;
node_->count = 1;
}
else if (c == node_->min) {
// We can compact the table "from the left".
unsigned char new_min = node_->min;
for (unsigned short i = 1; i < node_->count; ++i) {
if (node_->next.table [i]) {
new_min = i + node_->min;
break;
}
}
xs_assert (new_min != node_->min);
node_t **old_table = node_->next.table;
xs_assert (new_min > node_->min);
xs_assert (node_->count > new_min - node_->min);
node_->count = node_->count - (new_min - node_->min);
node_->next.table =
(node_t**) malloc (sizeof (node_t*) * node_->count);
xs_assert (node_->next.table);
memmove (node_->next.table, old_table + (new_min - node_->min),
sizeof (node_t*) * node_->count);
free (old_table);
node_->min = new_min;
}
else if (c == node_->min + node_->count - 1) {
// We can compact the table "from the right".
unsigned short new_count = node_->count;
for (unsigned short i = 1; i < node_->count; ++i) {
if (node_->next.table [node_->count - 1 - i]) {
new_count = node_->count - i;
break;
}
}
xs_assert (new_count != node_->count);
node_->count = new_count;
node_t **old_table = node_->next.table;
node_->next.table =
(node_t**) malloc (sizeof (node_t*) * node_->count);
xs_assert (node_->next.table);
memmove (node_->next.table, old_table,
sizeof (node_t*) * node_->count);
free (old_table);
}
}
}
return ret;
}
void xs::prefix_filter_t::list (node_t *node_, unsigned char **buff_,
size_t buffsize_, size_t maxbuffsize_, void *arg_)
{
// If this node is a subscription, apply the function.
if (node_->subscribers) {
int rc = xs_filter_subscribed (arg_, *buff_, buffsize_);
errno_assert (rc == 0);
}
// Adjust the buffer.
if (buffsize_ >= maxbuffsize_) {
maxbuffsize_ = buffsize_ + 256;
*buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
xs_assert (*buff_);
}
// If there are no subnodes in the trie, return.
if (node_->count == 0)
return;
// If there's one subnode (optimisation).
if (node_->count == 1) {
(*buff_) [buffsize_] = node_->min;
buffsize_++;
list (node_->next.node, buff_, buffsize_, maxbuffsize_, arg_);
return;
}
// If there are multiple subnodes.
for (unsigned short c = 0; c != node_->count; c++) {
(*buff_) [buffsize_] = node_->min + c;
if (node_->next.table [c])
list (node_->next.table [c], buff_, buffsize_ + 1, maxbuffsize_,
arg_);
}
}
bool xs::prefix_filter_t::is_redundant (node_t *node_)
{
return !node_->subscribers && node_->live_nodes == 0;
}
// Implementation of the C interface of the filter.
// Following functions convert raw C calls into calls to C++ object methods.
static int id (void *core_)
{
return XS_FILTER_PREFIX;
}
static void *pf_create (void *core_)
{
void *pf = (void*) new (std::nothrow) xs::prefix_filter_t;
alloc_assert (pf);
return pf;
}
static void pf_destroy (void *core_, void *pf_)
{
delete (xs::prefix_filter_t*) pf_;
}
static int pf_subscribe (void *core_, void *pf_, void *subscriber_,
const unsigned char *data_, size_t size_)
{
return ((xs::prefix_filter_t*) pf_)->subscribe (core_, subscriber_,
data_, size_);
}
static int pf_unsubscribe (void *core_, void *pf_, void *subscriber_,
const unsigned char *data_, size_t size_)
{
return ((xs::prefix_filter_t*) pf_)->unsubscribe (core_, subscriber_,
data_, size_);
}
static void pf_unsubscribe_all (void *core_, void *pf_, void *subscriber_)
{
((xs::prefix_filter_t*) pf_)->unsubscribe_all (core_, subscriber_);
}
static void pf_match (void *core_, void *pf_,
const unsigned char *data_, size_t size_)
{
((xs::prefix_filter_t*) pf_)->match_all (core_, data_, size_);
}
static void *sf_create (void *core_)
{
void *sf = (void*) new (std::nothrow) xs::prefix_filter_t;
alloc_assert (sf);
return sf;
}
static void sf_destroy (void *core_, void *sf_)
{
delete (xs::prefix_filter_t*) sf_;
}
static int sf_subscribe (void *core_, void *sf_,
const unsigned char *data_, size_t size_)
{
return ((xs::prefix_filter_t*) sf_)->subscribe (core_, NULL,
data_, size_);
}
static int sf_unsubscribe (void *core_, void *sf_,
const unsigned char *data_, size_t size_)
{
return ((xs::prefix_filter_t*) sf_)->unsubscribe (core_, NULL,
data_, size_);
}
static void sf_enumerate (void *core_, void *sf_)
{
((xs::prefix_filter_t*) sf_)->enumerate (core_);
}
static int sf_match (void *core_, void *sf_,
const unsigned char *data_, size_t size_)
{
return ((xs::prefix_filter_t*) sf_)->match (core_, data_, size_);
}
static xs_filter_t filter = {
XS_PLUGIN_FILTER,
1,
id,
pf_create,
pf_destroy,
pf_subscribe,
pf_unsubscribe,
pf_unsubscribe_all,
pf_match,
sf_create,
sf_destroy,
sf_subscribe,
sf_unsubscribe,
sf_enumerate,
sf_match,
};
void *xs::prefix_filter = (void*) &filter;