From d82cbb3a81f116cd22e9895ecac36ac3d7b38929 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 5 Apr 2012 07:32:58 +0200 Subject: XS_PLUGIN and XS_FILTER implementation This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik --- .gitignore | 2 + builds/msvc/libxs/libxs.vcxproj | 8 +- builds/msvc/libxs/libxs.vcxproj.filters | 24 +- builds/msvc/tests/tests.vcxproj | 4 + builds/msvc/tests/tests.vcxproj.filters | 3 + include/xs.h | 49 +++ src/Makefile.am | 8 +- src/core.cpp | 68 ++++ src/core.hpp | 53 +++ src/ctx.cpp | 39 ++ src/ctx.hpp | 14 +- src/mtrie.cpp | 436 --------------------- src/mtrie.hpp | 93 ----- src/object.cpp | 5 + src/object.hpp | 5 + src/options.cpp | 20 + src/options.hpp | 3 + src/prefix_filter.cpp | 664 ++++++++++++++++++++++++++++++++ src/prefix_filter.hpp | 108 ++++++ src/sub.cpp | 19 +- src/trie.cpp | 338 ---------------- src/trie.hpp | 79 ---- src/wire.hpp | 4 + src/xpub.cpp | 153 ++++++-- src/xpub.hpp | 30 +- src/xsub.cpp | 119 +++++- src/xsub.hpp | 29 +- tests/Makefile.am | 4 +- tests/resubscribe.cpp | 103 +++++ tests/tests.cpp | 6 + 30 files changed, 1453 insertions(+), 1037 deletions(-) create mode 100644 src/core.cpp create mode 100644 src/core.hpp delete mode 100644 src/mtrie.cpp delete mode 100644 src/mtrie.hpp create mode 100644 src/prefix_filter.cpp create mode 100644 src/prefix_filter.hpp delete mode 100644 src/trie.cpp delete mode 100644 src/trie.hpp create mode 100644 tests/resubscribe.cpp diff --git a/.gitignore b/.gitignore index 2b25d2a..ce92051 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,8 @@ tests/max_sockets tests/emptyctx tests/polltimeo tests/wireformat +tests/libzmq21 +tests/resubscribe src/platform.hpp* src/stamp-h1 perf/local_lat diff --git a/builds/msvc/libxs/libxs.vcxproj b/builds/msvc/libxs/libxs.vcxproj index 24eb9ae..bbab5fd 100644 --- a/builds/msvc/libxs/libxs.vcxproj +++ b/builds/msvc/libxs/libxs.vcxproj @@ -104,6 +104,7 @@ + @@ -122,7 +123,6 @@ - @@ -135,6 +135,7 @@ Create + @@ -152,7 +153,6 @@ - @@ -170,6 +170,7 @@ + @@ -191,7 +192,6 @@ - @@ -203,6 +203,7 @@ + @@ -221,7 +222,6 @@ - diff --git a/builds/msvc/libxs/libxs.vcxproj.filters b/builds/msvc/libxs/libxs.vcxproj.filters index db8e66d..eb0f505 100644 --- a/builds/msvc/libxs/libxs.vcxproj.filters +++ b/builds/msvc/libxs/libxs.vcxproj.filters @@ -65,9 +65,6 @@ Source Files - - Source Files - Source Files @@ -149,9 +146,6 @@ Source Files - - Source Files - Source Files @@ -176,6 +170,12 @@ Source Files + + Source Files + + + Source Files + @@ -259,9 +259,6 @@ Header Files - - Header Files - Header Files @@ -349,9 +346,6 @@ Header Files - - Header Files - Header Files @@ -388,5 +382,11 @@ Header Files + + Header Files + + + Header Files + diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj index ee4d414..bb707c4 100644 --- a/builds/msvc/tests/tests.vcxproj +++ b/builds/msvc/tests/tests.vcxproj @@ -164,6 +164,10 @@ true true + + true + true + diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters index d858166..b1eade2 100644 --- a/builds/msvc/tests/tests.vcxproj.filters +++ b/builds/msvc/tests/tests.vcxproj.filters @@ -62,6 +62,9 @@ Header Files + + Header Files + diff --git a/include/xs.h b/include/xs.h index 7e49df4..6f0004c 100644 --- a/include/xs.h +++ b/include/xs.h @@ -148,6 +148,7 @@ XS_EXPORT int xs_getmsgopt (xs_msg_t *msg, int option, void *optval, #define XS_MAX_SOCKETS 1 #define XS_IO_THREADS 2 +#define XS_PLUGIN 3 XS_EXPORT void *xs_init (); XS_EXPORT int xs_term (void *context); @@ -258,6 +259,54 @@ XS_EXPORT void *xs_stopwatch_start (void); /* the stopwatch was started. */ XS_EXPORT unsigned long xs_stopwatch_stop (void *watch); +/******************************************************************************/ +/* The API for pluggable filters. */ +/* THIS IS EXPERIMENTAL WORK AND MAY CHANGE WITHOUT PRIOR NOTICE. */ +/******************************************************************************/ + +#define XS_FILTER 34 + +#define XS_PLUGIN_FILTER 1 + +#define XS_FILTER_ALL 0 +#define XS_FILTER_PREFIX 1 + +typedef struct +{ + int type; + int version; + + int (*id) (void *core); + void *(*pf_create) (void *core); + void (*pf_destroy) (void *core, void *pf); + int (*pf_subscribe) (void *core, void *pf, void *subscriber, + const unsigned char *data, size_t size); + int (*pf_unsubscribe) (void *core, void *pf, void *subscriber, + const unsigned char *data, size_t size); + void (*pf_unsubscribe_all) (void *core, void *pf, void *subscriber); + void (*pf_match) (void *core, void *pf, + const unsigned char *data, size_t size); + + void *(*sf_create) (void *core); + void (*sf_destroy) (void *core, void *sf); + int (*sf_subscribe) (void *core, void *sf, + const unsigned char *data, size_t size); + int (*sf_unsubscribe) (void *core, void *sf, + const unsigned char *data, size_t size); + void (*sf_enumerate) (void *core, void *sf); + int (*sf_match) (void *core, void *sf, + const unsigned char *data, size_t size); + +} xs_filter_t; + +XS_EXPORT int xs_filter_subscribed (void *core, + const unsigned char *data, size_t size); + +XS_EXPORT int xs_filter_unsubscribed (void *core, + const unsigned char *data, size_t size); + +XS_EXPORT int xs_filter_matching (void *core, void *subscriber); + #undef XS_EXPORT #ifdef __cplusplus diff --git a/src/Makefile.am b/src/Makefile.am index fdfa8d1..3c260be 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,6 +23,7 @@ libxs_la_SOURCES = \ clock.hpp \ command.hpp \ config.hpp \ + core.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ @@ -44,7 +45,6 @@ libxs_la_SOURCES = \ likely.hpp \ mailbox.hpp \ msg.hpp \ - mtrie.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -56,6 +56,7 @@ libxs_la_SOURCES = \ platform.hpp \ poll.hpp \ pair.hpp \ + prefix_filter.hpp \ pub.hpp \ pull.hpp \ push.hpp \ @@ -74,7 +75,6 @@ libxs_la_SOURCES = \ tcp_connecter.hpp \ tcp_listener.hpp \ thread.hpp \ - trie.hpp \ upoll.hpp \ windows.hpp \ wire.hpp \ @@ -85,6 +85,7 @@ libxs_la_SOURCES = \ ypipe.hpp \ yqueue.hpp \ clock.cpp \ + core.cpp \ ctx.cpp \ decoder.cpp \ devpoll.cpp \ @@ -103,7 +104,6 @@ libxs_la_SOURCES = \ lb.cpp \ mailbox.cpp \ msg.cpp \ - mtrie.cpp \ object.cpp \ options.cpp \ own.cpp \ @@ -113,6 +113,7 @@ libxs_la_SOURCES = \ pgm_socket.cpp \ pipe.cpp \ poll.cpp \ + prefix_filter.cpp \ pull.cpp \ push.cpp \ reaper.cpp \ @@ -130,7 +131,6 @@ libxs_la_SOURCES = \ tcp_connecter.cpp \ tcp_listener.cpp \ thread.cpp \ - trie.cpp \ upoll.cpp \ xpub.cpp \ xrep.cpp \ diff --git a/src/core.cpp b/src/core.cpp new file mode 100644 index 0000000..2d16167 --- /dev/null +++ b/src/core.cpp @@ -0,0 +1,68 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "../include/xs.h" + +#include "core.hpp" +#include "err.hpp" + +xs::core_t::core_t () +{ +} + +xs::core_t::~core_t () +{ +} + +int xs::core_t::filter_subscribed (const unsigned char *data_, size_t size_) +{ + errno = ENOTSUP; + return -1; +} + +int xs::core_t::filter_unsubscribed (const unsigned char *data_, size_t size_) +{ + errno = ENOTSUP; + return -1; +} + +int xs::core_t::filter_matching (void *subscriber_) +{ + errno = ENOTSUP; + return -1; +} + +int xs_filter_subscribed (void *core_, + const unsigned char *data_, size_t size_) +{ + return ((xs::core_t*) core_)->filter_subscribed (data_, size_); +} + +int xs_filter_unsubscribed (void *core_, + const unsigned char *data_, size_t size_) +{ + return ((xs::core_t*) core_)->filter_unsubscribed (data_, size_); +} + +int xs_filter_matching (void *core_, void *subscriber_) +{ + return ((xs::core_t*) core_)->filter_matching (subscriber_); +} + diff --git a/src/core.hpp b/src/core.hpp new file mode 100644 index 0000000..fb50646 --- /dev/null +++ b/src/core.hpp @@ -0,0 +1,53 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __XS_CORE_HPP_INCLUDED__ +#define __XS_CORE_HPP_INCLUDED__ + +#include + +namespace xs +{ + + // This class is not a core of Crossroads. It's rather a callback interface + // for extensions, ie. what's extensions see as Crossroads core. + + class core_t + { + public: + + core_t (); + virtual ~core_t (); + + virtual int filter_subscribed (const unsigned char *data_, + size_t size_); + virtual int filter_unsubscribed (const unsigned char *data_, + size_t size_); + virtual int filter_matching (void *subscriber_); + + private: + + core_t (const core_t&); + const core_t &operator = (const core_t&); + }; + +} + +#endif diff --git a/src/ctx.cpp b/src/ctx.cpp index 21dec58..46fa984 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" +#include "prefix_filter.hpp" xs::ctx_t::ctx_t () : tag (0xbadcafe0), @@ -46,6 +47,9 @@ xs::ctx_t::ctx_t () : max_sockets (512), io_thread_count (1) { + // Plug in the standard plugins. + int rc = plug (prefix_filter); + xs_assert (rc == 0); } bool xs::ctx_t::check_tag () @@ -124,6 +128,28 @@ int xs::ctx_t::terminate () return 0; } +int xs::ctx_t::plug (const void *ext_) +{ + if (!ext_) { + errno = EFAULT; + return -1; + } + + // The extension is a message filter plug-in. + xs_filter_t *filter = (xs_filter_t*) ext_; + if (filter->type == XS_PLUGIN_FILTER && filter->version == 1) { + opt_sync.lock (); + filters [filter->id (NULL)] = filter; + opt_sync.unlock (); + return 0; + } + + // Specified extension type is not supported by this version of + // the library. + errno = ENOTSUP; + return -1; +} + int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) { switch (option_) { @@ -145,6 +171,8 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) io_thread_count = *((int*) optval_); opt_sync.unlock (); break; + case XS_PLUGIN: + return plug (optval_); default: errno = EINVAL; return -1; @@ -256,6 +284,17 @@ xs::object_t *xs::ctx_t::get_reaper () return reaper; } +xs_filter_t *xs::ctx_t::get_filter (int filter_id_) +{ + xs_filter_t *result = NULL; + opt_sync.lock (); + filters_t::iterator it = filters.find (filter_id_); + if (it != filters.end ()) + result = it->second; + opt_sync.unlock (); + return result; +} + void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); diff --git a/src/ctx.hpp b/src/ctx.hpp index c4fa96a..54144ad 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -25,7 +25,8 @@ #include #include #include -#include + +#include "../include/xs.h" #include "mailbox.hpp" #include "array.hpp" @@ -88,6 +89,10 @@ namespace xs // Returns reaper thread object. xs::object_t *get_reaper (); + // Get the filter associated with the specified filter ID or NULL + // If such filter is not registered. + xs_filter_t *get_filter (int filter_id_); + // Management of inproc endpoints. int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (xs::socket_base_t *socket_); @@ -102,6 +107,9 @@ namespace xs private: + // Plug in the extension specified. + int plug (const void *ext_); + // Used to check whether the object is a context. uint32_t tag; @@ -161,6 +169,10 @@ namespace xs // Synchronisation of access to context options. mutex_t opt_sync; + // List of all filters plugged into the context. + typedef std::map filters_t; + filters_t filters; + ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; diff --git a/src/mtrie.cpp b/src/mtrie.cpp deleted file mode 100644 index eae34c2..0000000 --- a/src/mtrie.cpp +++ /dev/null @@ -1,436 +0,0 @@ -/* - Copyright (c) 2011-2012 250bpm s.r.o. - Copyright (c) 2011-2012 Spotify AB - Copyright (c) 2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include - -#include -#include - -#include "platform.hpp" -#if defined XS_HAVE_WINDOWS -#include "windows.hpp" -#endif - -#include "err.hpp" -#include "pipe.hpp" -#include "mtrie.hpp" - -xs::mtrie_t::mtrie_t () : - pipes (0), - min (0), - count (0), - live_nodes (0) -{ -} - -xs::mtrie_t::~mtrie_t () -{ - if (pipes) { - delete pipes; - pipes = 0; - } - - if (count == 1) { - xs_assert (next.node); - delete next.node; - next.node = 0; - } - else if (count > 1) { - for (unsigned short i = 0; i != count; ++i) - if (next.table [i]) - delete next.table [i]; - free (next.table); - } -} - -bool xs::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return add_helper (prefix_, size_, pipe_); -} - -bool xs::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - // We are at the node corresponding to the prefix. We are done. - if (!size_) { - bool result = !pipes; - if (!pipes) - pipes = new pipes_t; - pipes->insert (pipe_); - return result; - } - - unsigned char c = *prefix_; - if (c < min || c >= min + count) { - - // The character is out of range of currently handled - // charcters. We have to extend the table. - if (!count) { - min = c; - count = 1; - next.node = NULL; - } - else if (count == 1) { - unsigned char oldc = min; - mtrie_t *oldp = next.node; - count = (min < c ? c - min : min - c) + 1; - next.table = (mtrie_t**) - malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - for (unsigned short i = 0; i != count; ++i) - next.table [i] = 0; - min = std::min (min, c); - next.table [oldc - min] = oldp; - } - else if (min < c) { - - // The new character is above the current character range. - unsigned short old_count = count; - count = c - min + 1; - next.table = (mtrie_t**) realloc ((void*) next.table, - sizeof (mtrie_t*) * count); - xs_assert (next.table); - for (unsigned short i = old_count; i != count; i++) - next.table [i] = NULL; - } - else { - - // The new character is below the current character range. - unsigned short old_count = count; - count = (min + old_count) - c; - next.table = (mtrie_t**) realloc ((void*) next.table, - sizeof (mtrie_t*) * count); - xs_assert (next.table); - memmove (next.table + min - c, next.table, - old_count * sizeof (mtrie_t*)); - for (unsigned short i = 0; i != min - c; i++) - next.table [i] = NULL; - min = c; - } - } - - // If next node does not exist, create one. - if (count == 1) { - if (!next.node) { - next.node = new (std::nothrow) mtrie_t; - ++live_nodes; - xs_assert (next.node); - } - return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_); - } - else { - if (!next.table [c - min]) { - next.table [c - min] = new (std::nothrow) mtrie_t; - ++live_nodes; - xs_assert (next.table [c - min]); - } - return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_); - } -} - - -void xs::mtrie_t::rm (pipe_t *pipe_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) -{ - unsigned char *buff = NULL; - rm_helper (pipe_, &buff, 0, 0, func_, arg_); - free (buff); -} - -void xs::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) -{ - // Remove the subscription from this node. - if (pipes && pipes->erase (pipe_) && pipes->empty ()) { - func_ (*buff_, buffsize_, arg_); - delete pipes; - pipes = 0; - } - - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); - alloc_assert (*buff_); - } - - // If there are no subnodes in the trie, return. - if (count == 0) - return; - - // If there's one subnode (optimisation). - if (count == 1) { - (*buff_) [buffsize_] = min; - buffsize_++; - next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, - func_, arg_); - - // Prune the node if it was made redundant by the removal - if (next.node->is_redundant ()) { - delete next.node; - next.node = 0; - count = 0; - --live_nodes; - xs_assert (live_nodes == 0); - } - return; - } - - // If there are multiple subnodes. - // - // New min non-null character in the node table after the removal - unsigned char new_min = min + count - 1; - // New max non-null character in the node table after the removal - unsigned char new_max = min; - for (unsigned short c = 0; c != count; c++) { - (*buff_) [buffsize_] = min + c; - if (next.table [c]) { - next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, - maxbuffsize_, func_, arg_); - - // Prune redundant nodes from the mtrie - if (next.table [c]->is_redundant ()) { - delete next.table [c]; - next.table [c] = 0; - - xs_assert (live_nodes > 0); - --live_nodes; - } - else { - // The node is not redundant, so it's a candidate for being - // the new min/max node. - // - // We loop through the node array from left to right, so the - // first non-null, non-redundant node encountered is the new - // minimum index. Conversely, the last non-redundant, non-null - // node encountered is the new maximum index. - if (c + min < new_min) - new_min = c + min; - if (c + min > new_max) - new_max = c + min; - } - } - } - - xs_assert (count > 1); - - // Compact the node table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - xs_assert (new_min == new_max); - xs_assert (new_min >= min && new_min < min + count); - mtrie_t *node = next.table [new_min - min]; - xs_assert (node); - free (next.table); - next.node = node; - count = 1; - min = new_min; - } - else if (live_nodes > 1 && (new_min > min || new_max < min + count - 1)) { - xs_assert (new_max - new_min + 1 > 1); - - mtrie_t **old_table = next.table; - xs_assert (new_min > min || new_max < min + count - 1); - xs_assert (new_min >= min); - xs_assert (new_max <= min + count - 1); - xs_assert (new_max - new_min + 1 < count); - - count = new_max - new_min + 1; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } -} - -bool xs::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return rm_helper (prefix_, size_, pipe_); -} - -bool xs::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - if (!size_) { - if (pipes) { - pipes_t::size_type erased = pipes->erase (pipe_); - xs_assert (erased == 1); - if (pipes->empty ()) { - delete pipes; - pipes = 0; - } - } - return !pipes; - } - - unsigned char c = *prefix_; - if (!count || c < min || c >= min + count) - return false; - - mtrie_t *next_node = - count == 1 ? next.node : next.table [c - min]; - - if (!next_node) - return false; - - bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); - - if (next_node->is_redundant ()) { - delete next_node; - xs_assert (count > 0); - - if (count == 1) { - next.node = 0; - count = 0; - --live_nodes; - xs_assert (live_nodes == 0); - } - else { - next.table [c - min] = 0; - xs_assert (live_nodes > 1); - --live_nodes; - - // Compact the table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - mtrie_t *node = 0; - for (unsigned short i = 0; i < count; ++i) { - if (next.table [i]) { - node = next.table [i]; - min = i + min; - break; - } - } - - xs_assert (node); - free (next.table); - next.node = node; - count = 1; - } - else if (c == min) { - // We can compact the table "from the left" - unsigned char new_min = min; - for (unsigned short i = 1; i < count; ++i) { - if (next.table [i]) { - new_min = i + min; - break; - } - } - xs_assert (new_min != min); - - mtrie_t **old_table = next.table; - xs_assert (new_min > min); - xs_assert (count > new_min - min); - - count = count - (new_min - min); - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } - else if (c == min + count - 1) { - // We can compact the table "from the right" - unsigned short new_count = count; - for (unsigned short i = 1; i < count; ++i) { - if (next.table [count - 1 - i]) { - new_count = count - i; - break; - } - } - xs_assert (new_count != count); - count = new_count; - - mtrie_t **old_table = next.table; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table, sizeof (mtrie_t*) * count); - free (old_table); - } - } - } - - return ret; -} - -void xs::mtrie_t::match (unsigned char *data_, size_t size_, - void (*func_) (pipe_t *pipe_, void *arg_), void *arg_) -{ - mtrie_t *current = this; - while (true) { - - // Signal the pipes attached to this node. - if (current->pipes) { - for (pipes_t::iterator it = current->pipes->begin (); - it != current->pipes->end (); ++it) - func_ (*it, arg_); - } - - // If we are at the end of the message, there's nothing more to match. - if (!size_) - break; - - // If there are no subnodes in the trie, return. - if (current->count == 0) - break; - - // If there's one subnode (optimisation). - if (current->count == 1) { - if (data_ [0] != current->min) - break; - current = current->next.node; - data_++; - size_--; - continue; - } - - // If there are multiple subnodes. - if (data_ [0] < current->min || data_ [0] >= - current->min + current->count) - break; - if (!current->next.table [data_ [0] - current->min]) - break; - current = current->next.table [data_ [0] - current->min]; - data_++; - size_--; - } -} - -bool xs::mtrie_t::is_redundant () const -{ - return !pipes && live_nodes == 0; -} - diff --git a/src/mtrie.hpp b/src/mtrie.hpp deleted file mode 100644 index 1e56b4e..0000000 --- a/src/mtrie.hpp +++ /dev/null @@ -1,93 +0,0 @@ -/* - Copyright (c) 2011-2012 250bpm s.r.o. - Copyright (c) 2011-2012 Spotify AB - Copyright (c) 2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __XS_MTRIE_HPP_INCLUDED__ -#define __XS_MTRIE_HPP_INCLUDED__ - -#include -#include - -#include "stdint.hpp" - -namespace xs -{ - - class pipe_t; - - // Multi-trie. Each node in the trie is a set of pointers to pipes. - - class mtrie_t - { - public: - - mtrie_t (); - ~mtrie_t (); - - // Add key to the trie. Returns true if it's a new subscription - // rather than a duplicate. - bool add (unsigned char *prefix_, size_t size_, xs::pipe_t *pipe_); - - // Remove all subscriptions for a specific peer from the trie. - // If there are no subscriptions left on some topics, invoke the - // supplied callback function. - void rm (xs::pipe_t *pipe_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); - - // Remove specific subscription from the trie. Return true is it was - // actually removed rather than de-duplicated. - bool rm (unsigned char *prefix_, size_t size_, xs::pipe_t *pipe_); - - // Signal all the matching pipes. - void match (unsigned char *data_, size_t size_, - void (*func_) (xs::pipe_t *pipe_, void *arg_), void *arg_); - - private: - - bool add_helper (unsigned char *prefix_, size_t size_, - xs::pipe_t *pipe_); - void rm_helper (xs::pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); - bool rm_helper (unsigned char *prefix_, size_t size_, - xs::pipe_t *pipe_); - bool is_redundant () const; - - typedef std::set pipes_t; - pipes_t *pipes; - - unsigned char min; - unsigned short count; - unsigned short live_nodes; - union { - class mtrie_t *node; - class mtrie_t **table; - } next; - - mtrie_t (const mtrie_t&); - const mtrie_t &operator = (const mtrie_t&); - }; - -} - -#endif - diff --git a/src/object.cpp b/src/object.cpp index 5c1ed84..4f04fe4 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -154,6 +154,11 @@ xs::io_thread_t *xs::object_t::choose_io_thread (uint64_t affinity_) return ctx->choose_io_thread (affinity_); } +xs_filter_t *xs::object_t::get_filter (int filter_id_) +{ + return ctx->get_filter (filter_id_); +} + void xs::object_t::send_stop () { // 'stop' command goes always from administrative thread to diff --git a/src/object.hpp b/src/object.hpp index 5b855a5..fabb156 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -22,6 +22,8 @@ #ifndef __XS_OBJECT_HPP_INCLUDED__ #define __XS_OBJECT_HPP_INCLUDED__ +#include "../include/xs.h" + #include "stdint.hpp" namespace xs @@ -64,6 +66,9 @@ namespace xs // Chooses least loaded I/O thread. xs::io_thread_t *choose_io_thread (uint64_t affinity_); + // Functions related to extensions. + xs_filter_t *get_filter (int filter_id_); + // Derived object can use these functions to send commands // to other objects. void send_stop (); diff --git a/src/options.cpp b/src/options.cpp index c9cbaae..fdbffde 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,6 +23,8 @@ #include #include +#include "../include/xs.h" + #include "options.hpp" #include "err.hpp" @@ -47,6 +49,7 @@ xs::options_t::options_t () : ipv4only (1), keepalive (0), protocol (0), + filter_id (XS_FILTER_PREFIX), delay_on_close (true), delay_on_disconnect (true), filter (false), @@ -248,6 +251,14 @@ int xs::options_t::setsockopt (int option_, const void *optval_, return 0; } + case XS_FILTER: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + filter_id = *((int*) optval_); + return 0; + } errno = EINVAL; @@ -438,6 +449,15 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case XS_FILTER: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = filter_id; + *optvallen_ = sizeof (int); + return 0; + } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index c1e4dda..1288f72 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -95,6 +95,9 @@ namespace xs // Version of wire protocol to use. int protocol; + // Filter ID to be used with subscriptions and unsubscriptions. + int filter_id; + // If true, session reads all the pending messages from the pipe and // sends them to the network when socket is closed. bool delay_on_close; diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp new file mode 100644 index 0000000..5d21fb6 --- /dev/null +++ b/src/prefix_filter.cpp @@ -0,0 +1,664 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2011-2012 Spotify AB + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include + +#include "../include/xs.h" + +#include "prefix_filter.hpp" +#include "err.hpp" + +xs::prefix_filter_t::prefix_filter_t () +{ + init (&root); +} + +xs::prefix_filter_t::~prefix_filter_t () +{ + close (&root); +} + +int xs::prefix_filter_t::subscribe (void *core_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return add (&root, data_, size_, subscriber_) ? 1 : 0; +} + +int xs::prefix_filter_t::unsubscribe (void *core_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return rm (&root, data_, size_, subscriber_) ? 1 : 0; +} + +void xs::prefix_filter_t::unsubscribe_all (void *core_, void *subscriber_) +{ + rm (&root, subscriber_, core_); +} + +void xs::prefix_filter_t::enumerate (void *core_) +{ + unsigned char *buff = NULL; + list (&root, &buff, 0, 0, core_); + free (buff); +} + +int xs::prefix_filter_t::match (void *core_, + const unsigned char *data_, size_t size_) +{ + // This function is on critical path. It deliberately doesn't use + // recursion to get a bit better performance. + node_t *current = &root; + while (true) { + + // We've found a corresponding subscription! + if (current->subscribers) + return 1; + + // We've checked all the data and haven't found matching subscription. + if (!size_) + return 0; + + // If there's no corresponding slot for the first character + // of the prefix, the message does not match. + unsigned char c = *data_; + if (c < current->min || c >= current->min + current->count) + return 0; + + // Move to the next character. + if (current->count == 1) + current = current->next.node; + else { + current = current->next.table [c - current->min]; + if (!current) + return 0; + } + data_++; + size_--; + } + +} + +void xs::prefix_filter_t::match_all (void *core_, + const unsigned char *data_, size_t size_) +{ + node_t *current = &root; + while (true) { + + // Signal the subscribers attached to this node. + if (current->subscribers) { + for (node_t::subscribers_t::iterator it = + current->subscribers->begin (); + it != current->subscribers->end (); ++it) { + int rc = xs_filter_matching (core_, it->first); + errno_assert (rc == 0); + } + } + + // If we are at the end of the message, there's nothing more to match. + if (!size_) + break; + + // If there are no subnodes in the trie, return. + if (current->count == 0) + break; + + // If there's one subnode (optimisation). + if (current->count == 1) { + if (data_ [0] != current->min) + break; + current = current->next.node; + data_++; + size_--; + continue; + } + + // If there are multiple subnodes. + if (data_ [0] < current->min || data_ [0] >= + current->min + current->count) + break; + if (!current->next.table [data_ [0] - current->min]) + break; + current = current->next.table [data_ [0] - current->min]; + data_++; + size_--; + } +} + +void xs::prefix_filter_t::init (node_t *node_) +{ + node_->subscribers = NULL; + node_->min = 0; + node_->count = 0; + node_->live_nodes = 0; +} + +void xs::prefix_filter_t::close (node_t *node_) +{ + if (node_->subscribers) { + delete node_->subscribers; + node_->subscribers = NULL; + } + + if (node_->count == 1) { + xs_assert (node_->next.node); + close (node_->next.node); + delete node_->next.node; + node_->next.node = NULL; + } + else if (node_->count > 1) { + for (unsigned short i = 0; i != node_->count; ++i) + if (node_->next.table [i]) { + close (node_->next.table [i]); + delete node_->next.table [i]; + } + free (node_->next.table); + } +} + +bool xs::prefix_filter_t::add (node_t *node_, const unsigned char *prefix_, + size_t size_, void *subscriber_) +{ + // We are at the node corresponding to the prefix. We are done. + if (!size_) { + bool result = !node_->subscribers; + if (!node_->subscribers) + node_->subscribers = new (std::nothrow) node_t::subscribers_t; + node_t::subscribers_t::iterator it = node_->subscribers->insert ( + node_t::subscribers_t::value_type (subscriber_, 0)).first; + ++it->second; + return result; + } + + unsigned char c = *prefix_; + if (c < node_->min || c >= node_->min + node_->count) { + + // The character is out of range of currently handled + // charcters. We have to extend the table. + if (!node_->count) { + node_->min = c; + node_->count = 1; + node_->next.node = NULL; + } + else if (node_->count == 1) { + unsigned char oldc = node_->min; + node_t *oldp = node_->next.node; + node_->count = + (node_->min < c ? c - node_->min : node_->min - c) + 1; + node_->next.table = (node_t**) + malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + for (unsigned short i = 0; i != node_->count; ++i) + node_->next.table [i] = 0; + node_->min = std::min (node_->min, c); + node_->next.table [oldc - node_->min] = oldp; + } + else if (node_->min < c) { + + // The new character is above the current character range. + unsigned short old_count = node_->count; + node_->count = c - node_->min + 1; + node_->next.table = (node_t**) realloc ((void*) node_->next.table, + sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + for (unsigned short i = old_count; i != node_->count; i++) + node_->next.table [i] = NULL; + } + else { + + // The new character is below the current character range. + unsigned short old_count = node_->count; + node_->count = (node_->min + old_count) - c; + node_->next.table = (node_t**) realloc ((void*) node_->next.table, + sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + memmove (node_->next.table + node_->min - c, node_->next.table, + old_count * sizeof (node_t*)); + for (unsigned short i = 0; i != node_->min - c; i++) + node_->next.table [i] = NULL; + node_->min = c; + } + } + + // If next node does not exist, create one. + if (node_->count == 1) { + if (!node_->next.node) { + node_->next.node = new (std::nothrow) node_t; + alloc_assert (node_->next.node); + init (node_->next.node); + ++node_->live_nodes; + xs_assert (node_->next.node); + } + return add (node_->next.node, prefix_ + 1, size_ - 1, subscriber_); + } + else { + if (!node_->next.table [c - node_->min]) { + node_->next.table [c - node_->min] = new (std::nothrow) node_t; + alloc_assert (node_->next.table [c - node_->min]); + init (node_->next.table [c - node_->min]); + ++node_->live_nodes; + xs_assert (node_->next.table [c - node_->min]); + } + return add (node_->next.table [c - node_->min], prefix_ + 1, size_ - 1, + subscriber_); + } +} + + +void xs::prefix_filter_t::rm (node_t *node_, void *subscriber_, void *arg_) +{ + unsigned char *buff = NULL; + rm_helper (node_, subscriber_, &buff, 0, 0, arg_); + free (buff); +} + +void xs::prefix_filter_t::rm_helper (node_t *node_, void *subscribers_, + unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, void *arg_) +{ + // Remove the subscription from this node. + if (node_->subscribers) { + node_t::subscribers_t::iterator it = + node_->subscribers->find (subscribers_); + if (it != node_->subscribers->end ()) { + xs_assert (it->second); + --it->second; + if (!it->second) { + node_->subscribers->erase (it); + if (node_->subscribers->empty ()) { + int rc = xs_filter_unsubscribed (arg_, *buff_, buffsize_); + errno_assert (rc == 0); + delete node_->subscribers; + node_->subscribers = 0; + } + } + } + } + + // Adjust the buffer. + if (buffsize_ >= maxbuffsize_) { + maxbuffsize_ = buffsize_ + 256; + *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); + alloc_assert (*buff_); + } + + // If there are no subnodes in the trie, return. + if (node_->count == 0) + return; + + // If there's one subnode (optimisation). + if (node_->count == 1) { + (*buff_) [buffsize_] = node_->min; + buffsize_++; + rm_helper (node_->next.node, subscribers_, buff_, buffsize_, + maxbuffsize_, arg_); + + // Prune the node if it was made redundant by the removal + if (is_redundant (node_->next.node)) { + close (node_->next.node); + delete node_->next.node; + node_->next.node = 0; + node_->count = 0; + --node_->live_nodes; + xs_assert (node_->live_nodes == 0); + } + return; + } + + // If there are multiple subnodes. + // + // New min non-null character in the node table after the removal. + unsigned char new_min = node_->min; + // New max non-null character in the node table after the removal. + unsigned char new_max = node_->min + node_->count - 1; + for (unsigned short c = 0; c != node_->count; c++) { + (*buff_) [buffsize_] = node_->min + c; + if (node_->next.table [c]) { + rm_helper (node_->next.table [c], subscribers_, buff_, + buffsize_ + 1, maxbuffsize_, arg_); + + // Prune redudant nodes from the the trie. + if (is_redundant (node_->next.table [c])) { + close (node_->next.table [c]); + delete node_->next.table [c]; + node_->next.table [c] = 0; + + xs_assert (node_->live_nodes > 0); + --node_->live_nodes; + } + else { + if (c + node_->min > new_min) + new_min = c + node_->min; + if (c + node_->min < new_max) + new_max = c + node_->min; + } + } + } + + xs_assert (node_->count > 1); + + // Compact the node table if possible. + if (node_->live_nodes == 1) { + + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation. + xs_assert (new_min == new_max); + xs_assert (new_min >= node_->min && + new_min < node_->min + node_->count); + node_t *node = node_->next.table [new_min - node_->min]; + xs_assert (node); + free (node_->next.table); + node_->next.node = node; + node_->count = 1; + node_->min = new_min; + } + else if (node_->live_nodes > 1 && + (new_min > node_->min || new_max < node_->min + node_->count - 1)) { + xs_assert (new_max - new_min + 1 > 1); + + node_t **old_table = node_->next.table; + xs_assert (new_min > node_->min || + new_max < node_->min + node_->count - 1); + xs_assert (new_min >= node_->min); + xs_assert (new_max <= node_->min + node_->count - 1); + xs_assert (new_max - new_min + 1 < node_->count); + + node_->count = new_max - new_min + 1; + node_->next.table = (node_t**) malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + + memmove (node_->next.table, old_table + (new_min - node_->min), + sizeof (node_t*) * node_->count); + free (old_table); + + node_->min = new_min; + } +} + +bool xs::prefix_filter_t::rm (node_t *node_, const unsigned char *prefix_, + size_t size_, void *subscriber_) +{ + if (!size_) { + + // Remove the subscription from this node. + if (node_->subscribers) { + node_t::subscribers_t::iterator it = + node_->subscribers->find (subscriber_); + if (it != node_->subscribers->end ()) { + xs_assert (it->second); + --it->second; + if (!it->second) { + node_->subscribers->erase (it); + if (node_->subscribers->empty ()) { + delete node_->subscribers; + node_->subscribers = 0; + } + } + } + } + return !node_->subscribers; + } + + unsigned char c = *prefix_; + if (!node_->count || c < node_->min || c >= node_->min + node_->count) + return false; + + node_t *next_node = node_->count == 1 ? node_->next.node : + node_->next.table [c - node_->min]; + + if (!next_node) + return false; + + bool ret = rm (next_node, prefix_ + 1, size_ - 1, subscriber_); + + if (is_redundant (next_node)) { + close (next_node); + delete next_node; + xs_assert (node_->count > 0); + + if (node_->count == 1) { + node_->next.node = 0; + node_->count = 0; + --node_->live_nodes; + xs_assert (node_->live_nodes == 0); + } + else { + node_->next.table [c - node_->min] = 0; + xs_assert (node_->live_nodes > 1); + --node_->live_nodes; + + // Compact the table if possible. + if (node_->live_nodes == 1) { + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + node_t *node = 0; + for (unsigned short i = 0; i < node_->count; ++i) { + if (node_->next.table [i]) { + node = node_->next.table [i]; + node_->min += i; + break; + } + } + + xs_assert (node); + free (node_->next.table); + node_->next.node = node; + node_->count = 1; + } + else if (c == node_->min) { + + // We can compact the table "from the left". + unsigned char new_min = node_->min; + for (unsigned short i = 1; i < node_->count; ++i) { + if (node_->next.table [i]) { + new_min = i + node_->min; + break; + } + } + xs_assert (new_min != node_->min); + + node_t **old_table = node_->next.table; + xs_assert (new_min > node_->min); + xs_assert (node_->count > new_min - node_->min); + + node_->count = node_->count - (new_min - node_->min); + node_->next.table = + (node_t**) malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + + memmove (node_->next.table, old_table + (new_min - node_->min), + sizeof (node_t*) * node_->count); + free (old_table); + + node_->min = new_min; + } + else if (c == node_->min + node_->count - 1) { + + // We can compact the table "from the right". + unsigned short new_count = node_->count; + for (unsigned short i = 1; i < node_->count; ++i) { + if (node_->next.table [node_->count - 1 - i]) { + new_count = node_->count - i; + break; + } + } + xs_assert (new_count != node_->count); + node_->count = new_count; + + node_t **old_table = node_->next.table; + node_->next.table = + (node_t**) malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + + memmove (node_->next.table, old_table, + sizeof (node_t*) * node_->count); + free (old_table); + } + } + } + + return ret; +} + +void xs::prefix_filter_t::list (node_t *node_, unsigned char **buff_, + size_t buffsize_, size_t maxbuffsize_, void *arg_) +{ + // If this node is a subscription, apply the function. + if (node_->subscribers) { + int rc = xs_filter_subscribed (arg_, *buff_, buffsize_); + errno_assert (rc == 0); + } + + // Adjust the buffer. + if (buffsize_ >= maxbuffsize_) { + maxbuffsize_ = buffsize_ + 256; + *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); + xs_assert (*buff_); + } + + // If there are no subnodes in the trie, return. + if (node_->count == 0) + return; + + // If there's one subnode (optimisation). + if (node_->count == 1) { + (*buff_) [buffsize_] = node_->min; + buffsize_++; + list (node_->next.node, buff_, buffsize_, maxbuffsize_, arg_); + return; + } + + // If there are multiple subnodes. + for (unsigned short c = 0; c != node_->count; c++) { + (*buff_) [buffsize_] = node_->min + c; + if (node_->next.table [c]) + list (node_->next.table [c], buff_, buffsize_ + 1, maxbuffsize_, + arg_); + } +} + +bool xs::prefix_filter_t::is_redundant (node_t *node_) +{ + return !node_->subscribers && node_->live_nodes == 0; +} + +// Implementation of the C interface of the filter. +// Following functions convert raw C calls into calls to C++ object methods. + +static int id (void *core_) +{ + return XS_FILTER_PREFIX; +} + +static void *pf_create (void *core_) +{ + void *pf = (void*) new (std::nothrow) xs::prefix_filter_t; + alloc_assert (pf); + return pf; +} + +static void pf_destroy (void *core_, void *pf_) +{ + delete (xs::prefix_filter_t*) pf_; +} + +static int pf_subscribe (void *core_, void *pf_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) pf_)->subscribe (core_, subscriber_, + data_, size_); +} + +static int pf_unsubscribe (void *core_, void *pf_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) pf_)->unsubscribe (core_, subscriber_, + data_, size_); +} + +static void pf_unsubscribe_all (void *core_, void *pf_, void *subscriber_) +{ + ((xs::prefix_filter_t*) pf_)->unsubscribe_all (core_, subscriber_); +} + +static void pf_match (void *core_, void *pf_, + const unsigned char *data_, size_t size_) +{ + ((xs::prefix_filter_t*) pf_)->match_all (core_, data_, size_); +} + +static void *sf_create (void *core_) +{ + void *sf = (void*) new (std::nothrow) xs::prefix_filter_t; + alloc_assert (sf); + return sf; +} + +static void sf_destroy (void *core_, void *sf_) +{ + delete (xs::prefix_filter_t*) sf_; +} + +static int sf_subscribe (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) sf_)->subscribe (core_, NULL, + data_, size_); +} + +static int sf_unsubscribe (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) sf_)->unsubscribe (core_, NULL, + data_, size_); +} + +static void sf_enumerate (void *core_, void *sf_) +{ + ((xs::prefix_filter_t*) sf_)->enumerate (core_); +} + +static int sf_match (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) sf_)->match (core_, data_, size_); +} + +static xs_filter_t filter = { + XS_PLUGIN_FILTER, + 1, + id, + pf_create, + pf_destroy, + pf_subscribe, + pf_unsubscribe, + pf_unsubscribe_all, + pf_match, + sf_create, + sf_destroy, + sf_subscribe, + sf_unsubscribe, + sf_enumerate, + sf_match, +}; + +void *xs::prefix_filter = (void*) &filter; + diff --git a/src/prefix_filter.hpp b/src/prefix_filter.hpp new file mode 100644 index 0000000..0faa865 --- /dev/null +++ b/src/prefix_filter.hpp @@ -0,0 +1,108 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2011-2012 Spotify AB + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __XS_PREFIX_FILTER_HPP_INCLUDED__ +#define __XS_PREFIX_FILTER_HPP_INCLUDED__ + +#include +#include + +#include "stdint.hpp" + +namespace xs +{ + + // Canonical extension object. + extern void *prefix_filter; + + class prefix_filter_t + { + public: + + prefix_filter_t (); + ~prefix_filter_t (); + + int subscribe (void *core_, void *subscriber_, + const unsigned char *data_, size_t size_); + int unsubscribe (void *core_, void *subscriber_, + const unsigned char *data_, size_t size_); + void unsubscribe_all (void *core_, void *subscriber_); + void enumerate (void *core_); + int match (void *core_, const unsigned char *data_, size_t size_); + void match_all (void *core_, const unsigned char *data_, size_t size_); + + private: + + struct node_t + { + // Pointer to particular subscriber associated with + // the reference count. + typedef std::map subscribers_t; + subscribers_t *subscribers; + + unsigned char min; + unsigned short count; + unsigned short live_nodes; + union { + struct node_t *node; + struct node_t **table; + } next; + + }; + + static void init (node_t *node_); + static void close (node_t *node_); + + // Add key to the trie. Returns true if it's a new subscription + // rather than a duplicate. + static bool add (node_t *node_, const unsigned char *prefix_, + size_t size_, void *subscriber_); + + // Remove specific subscription from the trie. Return true is it + // was actually removed rather than de-duplicated. + static bool rm (node_t *node_, const unsigned char *prefix_, +