diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2012-04-05 07:32:58 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2012-04-05 07:32:58 +0200 |
commit | d82cbb3a81f116cd22e9895ecac36ac3d7b38929 (patch) | |
tree | 03c923311b937f550bec325d131476513a02bebf | |
parent | 52b8a917deb2990e7197b82e81e0258ebe30f424 (diff) |
XS_PLUGIN and XS_FILTER implementation
This patch introduces following features:
- XS_PLUGIN context option to add plugins to libxs
- XS_FILTER option to switch between different filter types
- Automatic loading of plug-ins is *not* implemented.
From the implementation point of view:
- standard prefix filter is implemented as a pluggable filter
- trie_t and mtrie_t are joined into a single class
- the code for 0MQ/3.1 compatibility is left in in the form of comments
- new test for testing re-subscriptions is added
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | builds/msvc/libxs/libxs.vcxproj | 8 | ||||
-rw-r--r-- | builds/msvc/libxs/libxs.vcxproj.filters | 24 | ||||
-rw-r--r-- | builds/msvc/tests/tests.vcxproj | 4 | ||||
-rw-r--r-- | builds/msvc/tests/tests.vcxproj.filters | 3 | ||||
-rw-r--r-- | include/xs.h | 49 | ||||
-rw-r--r-- | src/Makefile.am | 8 | ||||
-rw-r--r-- | src/core.cpp | 68 | ||||
-rw-r--r-- | src/core.hpp | 53 | ||||
-rw-r--r-- | src/ctx.cpp | 39 | ||||
-rw-r--r-- | src/ctx.hpp | 14 | ||||
-rw-r--r-- | src/mtrie.cpp | 436 | ||||
-rw-r--r-- | src/mtrie.hpp | 93 | ||||
-rw-r--r-- | src/object.cpp | 5 | ||||
-rw-r--r-- | src/object.hpp | 5 | ||||
-rw-r--r-- | src/options.cpp | 20 | ||||
-rw-r--r-- | src/options.hpp | 3 | ||||
-rw-r--r-- | src/prefix_filter.cpp | 664 | ||||
-rw-r--r-- | src/prefix_filter.hpp | 108 | ||||
-rw-r--r-- | src/sub.cpp | 19 | ||||
-rw-r--r-- | src/trie.cpp | 338 | ||||
-rw-r--r-- | src/trie.hpp | 79 | ||||
-rw-r--r-- | src/wire.hpp | 4 | ||||
-rw-r--r-- | src/xpub.cpp | 153 | ||||
-rw-r--r-- | src/xpub.hpp | 30 | ||||
-rw-r--r-- | src/xsub.cpp | 119 | ||||
-rw-r--r-- | src/xsub.hpp | 29 | ||||
-rw-r--r-- | tests/Makefile.am | 4 | ||||
-rw-r--r-- | tests/resubscribe.cpp | 103 | ||||
-rw-r--r-- | tests/tests.cpp | 6 |
30 files changed, 1453 insertions, 1037 deletions
@@ -40,6 +40,8 @@ tests/max_sockets tests/emptyctx tests/polltimeo tests/wireformat +tests/libzmq21 +tests/resubscribe src/platform.hpp* src/stamp-h1 perf/local_lat diff --git a/builds/msvc/libxs/libxs.vcxproj b/builds/msvc/libxs/libxs.vcxproj index 24eb9ae..bbab5fd 100644 --- a/builds/msvc/libxs/libxs.vcxproj +++ b/builds/msvc/libxs/libxs.vcxproj @@ -104,6 +104,7 @@ </PropertyGroup> <ItemGroup> <ClCompile Include="..\..\..\src\clock.cpp" /> + <ClCompile Include="..\..\..\src\core.cpp" /> <ClCompile Include="..\..\..\src\ctx.cpp" /> <ClCompile Include="..\..\..\src\decoder.cpp" /> <ClCompile Include="..\..\..\src\devpoll.cpp" /> @@ -122,7 +123,6 @@ <ClCompile Include="..\..\..\src\lb.cpp" /> <ClCompile Include="..\..\..\src\mailbox.cpp" /> <ClCompile Include="..\..\..\src\msg.cpp" /> - <ClCompile Include="..\..\..\src\mtrie.cpp" /> <ClCompile Include="..\..\..\src\object.cpp" /> <ClCompile Include="..\..\..\src\options.cpp" /> <ClCompile Include="..\..\..\src\own.cpp" /> @@ -135,6 +135,7 @@ <ClCompile Include="..\..\..\src\precompiled.cpp"> <PrecompiledHeader>Create</PrecompiledHeader> </ClCompile> + <ClCompile Include="..\..\..\src\prefix_filter.cpp" /> <ClCompile Include="..\..\..\src\pub.cpp" /> <ClCompile Include="..\..\..\src\pull.cpp" /> <ClCompile Include="..\..\..\src\push.cpp" /> @@ -152,7 +153,6 @@ <ClCompile Include="..\..\..\src\tcp_connecter.cpp" /> <ClCompile Include="..\..\..\src\tcp_listener.cpp" /> <ClCompile Include="..\..\..\src\thread.cpp" /> - <ClCompile Include="..\..\..\src\trie.cpp" /> <ClCompile Include="..\..\..\src\upoll.cpp" /> <ClCompile Include="..\..\..\src\xpub.cpp" /> <ClCompile Include="..\..\..\src\xrep.cpp" /> @@ -170,6 +170,7 @@ <ClInclude Include="..\..\..\src\clock.hpp" /> <ClInclude Include="..\..\..\src\command.hpp" /> <ClInclude Include="..\..\..\src\config.hpp" /> + <ClInclude Include="..\..\..\src\core.hpp" /> <ClInclude Include="..\..\..\src\ctx.hpp" /> <ClInclude Include="..\..\..\src\decoder.hpp" /> <ClInclude Include="..\..\..\src\devpoll.hpp" /> @@ -191,7 +192,6 @@ <ClInclude Include="..\..\..\src\likely.hpp" /> <ClInclude Include="..\..\..\src\mailbox.hpp" /> <ClInclude Include="..\..\..\src\msg.hpp" /> - <ClInclude Include="..\..\..\src\mtrie.hpp" /> <ClInclude Include="..\..\..\src\mutex.hpp" /> <ClInclude Include="..\..\..\src\object.hpp" /> <ClInclude Include="..\..\..\src\options.hpp" /> @@ -203,6 +203,7 @@ <ClInclude Include="..\..\..\src\pipe.hpp" /> <ClInclude Include="..\..\..\src\poll.hpp" /> <ClInclude Include="..\..\..\src\precompiled.hpp" /> + <ClInclude Include="..\..\..\src\prefix_filter.hpp" /> <ClInclude Include="..\..\..\src\pub.hpp" /> <ClInclude Include="..\..\..\src\pull.hpp" /> <ClInclude Include="..\..\..\src\push.hpp" /> @@ -221,7 +222,6 @@ <ClInclude Include="..\..\..\src\tcp_connecter.hpp" /> <ClInclude Include="..\..\..\src\tcp_listener.hpp" /> <ClInclude Include="..\..\..\src\thread.hpp" /> - <ClInclude Include="..\..\..\src\trie.hpp" /> <ClInclude Include="..\..\..\src\upoll.hpp" /> <ClInclude Include="..\..\..\src\windows.hpp" /> <ClInclude Include="..\..\..\src\wire.hpp" /> diff --git a/builds/msvc/libxs/libxs.vcxproj.filters b/builds/msvc/libxs/libxs.vcxproj.filters index db8e66d..eb0f505 100644 --- a/builds/msvc/libxs/libxs.vcxproj.filters +++ b/builds/msvc/libxs/libxs.vcxproj.filters @@ -65,9 +65,6 @@ <ClCompile Include="..\..\..\src\msg.cpp"> <Filter>Source Files</Filter> </ClCompile> - <ClCompile Include="..\..\..\src\mtrie.cpp"> - <Filter>Source Files</Filter> - </ClCompile> <ClCompile Include="..\..\..\src\object.cpp"> <Filter>Source Files</Filter> </ClCompile> @@ -149,9 +146,6 @@ <ClCompile Include="..\..\..\src\thread.cpp"> <Filter>Source Files</Filter> </ClCompile> - <ClCompile Include="..\..\..\src\trie.cpp"> - <Filter>Source Files</Filter> - </ClCompile> <ClCompile Include="..\..\..\src\xpub.cpp"> <Filter>Source Files</Filter> </ClCompile> @@ -176,6 +170,12 @@ <ClCompile Include="..\..\..\src\xszmq.cpp"> <Filter>Source Files</Filter> </ClCompile> + <ClCompile Include="..\..\..\src\core.cpp"> + <Filter>Source Files</Filter> + </ClCompile> + <ClCompile Include="..\..\..\src\prefix_filter.cpp"> + <Filter>Source Files</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\..\include\xs.h"> @@ -259,9 +259,6 @@ <ClInclude Include="..\..\..\src\msg.hpp"> <Filter>Header Files</Filter> </ClInclude> - <ClInclude Include="..\..\..\src\mtrie.hpp"> - <Filter>Header Files</Filter> - </ClInclude> <ClInclude Include="..\..\..\src\mutex.hpp"> <Filter>Header Files</Filter> </ClInclude> @@ -349,9 +346,6 @@ <ClInclude Include="..\..\..\src\thread.hpp"> <Filter>Header Files</Filter> </ClInclude> - <ClInclude Include="..\..\..\src\trie.hpp"> - <Filter>Header Files</Filter> - </ClInclude> <ClInclude Include="..\..\..\src\windows.hpp"> <Filter>Header Files</Filter> </ClInclude> @@ -388,5 +382,11 @@ <ClInclude Include="..\..\..\src\io_thread.hpp"> <Filter>Header Files</Filter> </ClInclude> + <ClInclude Include="..\..\..\src\core.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="..\..\..\src\prefix_filter.hpp"> + <Filter>Header Files</Filter> + </ClInclude> </ItemGroup> </Project> diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj index ee4d414..bb707c4 100644 --- a/builds/msvc/tests/tests.vcxproj +++ b/builds/msvc/tests/tests.vcxproj @@ -164,6 +164,10 @@ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> </ClCompile> + <ClCompile Include="..\..\..\tests\resubscribe.cpp"> + <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> + <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> + </ClCompile> </ItemGroup> <ItemGroup> <ProjectReference Include="..\libxs\libxs.vcxproj"> diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters index d858166..b1eade2 100644 --- a/builds/msvc/tests/tests.vcxproj.filters +++ b/builds/msvc/tests/tests.vcxproj.filters @@ -62,6 +62,9 @@ <ClCompile Include="..\..\..\tests\libzmq21.cpp"> <Filter>Header Files</Filter> </ClCompile> + <ClCompile Include="..\..\..\tests\resubscribe.cpp"> + <Filter>Header Files</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <Filter Include="Header Files"> diff --git a/include/xs.h b/include/xs.h index 7e49df4..6f0004c 100644 --- a/include/xs.h +++ b/include/xs.h @@ -148,6 +148,7 @@ XS_EXPORT int xs_getmsgopt (xs_msg_t *msg, int option, void *optval, #define XS_MAX_SOCKETS 1 #define XS_IO_THREADS 2 +#define XS_PLUGIN 3 XS_EXPORT void *xs_init (); XS_EXPORT int xs_term (void *context); @@ -258,6 +259,54 @@ XS_EXPORT void *xs_stopwatch_start (void); /* the stopwatch was started. */ XS_EXPORT unsigned long xs_stopwatch_stop (void *watch); +/******************************************************************************/ +/* The API for pluggable filters. */ +/* THIS IS EXPERIMENTAL WORK AND MAY CHANGE WITHOUT PRIOR NOTICE. */ +/******************************************************************************/ + +#define XS_FILTER 34 + +#define XS_PLUGIN_FILTER 1 + +#define XS_FILTER_ALL 0 +#define XS_FILTER_PREFIX 1 + +typedef struct +{ + int type; + int version; + + int (*id) (void *core); + void *(*pf_create) (void *core); + void (*pf_destroy) (void *core, void *pf); + int (*pf_subscribe) (void *core, void *pf, void *subscriber, + const unsigned char *data, size_t size); + int (*pf_unsubscribe) (void *core, void *pf, void *subscriber, + const unsigned char *data, size_t size); + void (*pf_unsubscribe_all) (void *core, void *pf, void *subscriber); + void (*pf_match) (void *core, void *pf, + const unsigned char *data, size_t size); + + void *(*sf_create) (void *core); + void (*sf_destroy) (void *core, void *sf); + int (*sf_subscribe) (void *core, void *sf, + const unsigned char *data, size_t size); + int (*sf_unsubscribe) (void *core, void *sf, + const unsigned char *data, size_t size); + void (*sf_enumerate) (void *core, void *sf); + int (*sf_match) (void *core, void *sf, + const unsigned char *data, size_t size); + +} xs_filter_t; + +XS_EXPORT int xs_filter_subscribed (void *core, + const unsigned char *data, size_t size); + +XS_EXPORT int xs_filter_unsubscribed (void *core, + const unsigned char *data, size_t size); + +XS_EXPORT int xs_filter_matching (void *core, void *subscriber); + #undef XS_EXPORT #ifdef __cplusplus diff --git a/src/Makefile.am b/src/Makefile.am index fdfa8d1..3c260be 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,6 +23,7 @@ libxs_la_SOURCES = \ clock.hpp \ command.hpp \ config.hpp \ + core.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ @@ -44,7 +45,6 @@ libxs_la_SOURCES = \ likely.hpp \ mailbox.hpp \ msg.hpp \ - mtrie.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -56,6 +56,7 @@ libxs_la_SOURCES = \ platform.hpp \ poll.hpp \ pair.hpp \ + prefix_filter.hpp \ pub.hpp \ pull.hpp \ push.hpp \ @@ -74,7 +75,6 @@ libxs_la_SOURCES = \ tcp_connecter.hpp \ tcp_listener.hpp \ thread.hpp \ - trie.hpp \ upoll.hpp \ windows.hpp \ wire.hpp \ @@ -85,6 +85,7 @@ libxs_la_SOURCES = \ ypipe.hpp \ yqueue.hpp \ clock.cpp \ + core.cpp \ ctx.cpp \ decoder.cpp \ devpoll.cpp \ @@ -103,7 +104,6 @@ libxs_la_SOURCES = \ lb.cpp \ mailbox.cpp \ msg.cpp \ - mtrie.cpp \ object.cpp \ options.cpp \ own.cpp \ @@ -113,6 +113,7 @@ libxs_la_SOURCES = \ pgm_socket.cpp \ pipe.cpp \ poll.cpp \ + prefix_filter.cpp \ pull.cpp \ push.cpp \ reaper.cpp \ @@ -130,7 +131,6 @@ libxs_la_SOURCES = \ tcp_connecter.cpp \ tcp_listener.cpp \ thread.cpp \ - trie.cpp \ upoll.cpp \ xpub.cpp \ xrep.cpp \ diff --git a/src/core.cpp b/src/core.cpp new file mode 100644 index 0000000..2d16167 --- /dev/null +++ b/src/core.cpp @@ -0,0 +1,68 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../include/xs.h" + +#include "core.hpp" +#include "err.hpp" + +xs::core_t::core_t () +{ +} + +xs::core_t::~core_t () +{ +} + +int xs::core_t::filter_subscribed (const unsigned char *data_, size_t size_) +{ + errno = ENOTSUP; + return -1; +} + +int xs::core_t::filter_unsubscribed (const unsigned char *data_, size_t size_) +{ + errno = ENOTSUP; + return -1; +} + +int xs::core_t::filter_matching (void *subscriber_) +{ + errno = ENOTSUP; + return -1; +} + +int xs_filter_subscribed (void *core_, + const unsigned char *data_, size_t size_) +{ + return ((xs::core_t*) core_)->filter_subscribed (data_, size_); +} + +int xs_filter_unsubscribed (void *core_, + const unsigned char *data_, size_t size_) +{ + return ((xs::core_t*) core_)->filter_unsubscribed (data_, size_); +} + +int xs_filter_matching (void *core_, void *subscriber_) +{ + return ((xs::core_t*) core_)->filter_matching (subscriber_); +} + diff --git a/src/core.hpp b/src/core.hpp new file mode 100644 index 0000000..fb50646 --- /dev/null +++ b/src/core.hpp @@ -0,0 +1,53 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_CORE_HPP_INCLUDED__ +#define __XS_CORE_HPP_INCLUDED__ + +#include <stddef.h> + +namespace xs +{ + + // This class is not a core of Crossroads. It's rather a callback interface + // for extensions, ie. what's extensions see as Crossroads core. + + class core_t + { + public: + + core_t (); + virtual ~core_t (); + + virtual int filter_subscribed (const unsigned char *data_, + size_t size_); + virtual int filter_unsubscribed (const unsigned char *data_, + size_t size_); + virtual int filter_matching (void *subscriber_); + + private: + + core_t (const core_t&); + const core_t &operator = (const core_t&); + }; + +} + +#endif diff --git a/src/ctx.cpp b/src/ctx.cpp index 21dec58..46fa984 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" +#include "prefix_filter.hpp" xs::ctx_t::ctx_t () : tag (0xbadcafe0), @@ -46,6 +47,9 @@ xs::ctx_t::ctx_t () : max_sockets (512), io_thread_count (1) { + // Plug in the standard plugins. + int rc = plug (prefix_filter); + xs_assert (rc == 0); } bool xs::ctx_t::check_tag () @@ -124,6 +128,28 @@ int xs::ctx_t::terminate () return 0; } +int xs::ctx_t::plug (const void *ext_) +{ + if (!ext_) { + errno = EFAULT; + return -1; + } + + // The extension is a message filter plug-in. + xs_filter_t *filter = (xs_filter_t*) ext_; + if (filter->type == XS_PLUGIN_FILTER && filter->version == 1) { + opt_sync.lock (); + filters [filter->id (NULL)] = filter; + opt_sync.unlock (); + return 0; + } + + // Specified extension type is not supported by this version of + // the library. + errno = ENOTSUP; + return -1; +} + int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) { switch (option_) { @@ -145,6 +171,8 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) io_thread_count = *((int*) optval_); opt_sync.unlock (); break; + case XS_PLUGIN: + return plug (optval_); default: errno = EINVAL; return -1; @@ -256,6 +284,17 @@ xs::object_t *xs::ctx_t::get_reaper () return reaper; } +xs_filter_t *xs::ctx_t::get_filter (int filter_id_) +{ + xs_filter_t *result = NULL; + opt_sync.lock (); + filters_t::iterator it = filters.find (filter_id_); + if (it != filters.end ()) + result = it->second; + opt_sync.unlock (); + return result; +} + void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); diff --git a/src/ctx.hpp b/src/ctx.hpp index c4fa96a..54144ad 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -25,7 +25,8 @@ #include <map> #include <vector> #include <string> -#include <stdarg.h> + +#include "../include/xs.h" #include "mailbox.hpp" #include "array.hpp" @@ -88,6 +89,10 @@ namespace xs // Returns reaper thread object. xs::object_t *get_reaper (); + // Get the filter associated with the specified filter ID or NULL + // If such filter is not registered. + xs_filter_t *get_filter (int filter_id_); + // Management of inproc endpoints. int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (xs::socket_base_t *socket_); @@ -102,6 +107,9 @@ namespace xs private: + // Plug in the extension specified. + int plug (const void *ext_); + // Used to check whether the object is a context. uint32_t tag; @@ -161,6 +169,10 @@ namespace xs // Synchronisation of access to context options. mutex_t opt_sync; + // List of all filters plugged into the context. + typedef std::map <int, xs_filter_t*> filters_t; + filters_t filters; + ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; diff --git a/src/mtrie.cpp b/src/mtrie.cpp deleted file mode 100644 index eae34c2..0000000 --- a/src/mtrie.cpp +++ /dev/null @@ -1,436 +0,0 @@ -/* - Copyright (c) 2011-2012 250bpm s.r.o. - Copyright (c) 2011-2012 Spotify AB - Copyright (c) 2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <stdlib.h> - -#include <new> -#include <algorithm> - -#include "platform.hpp" -#if defined XS_HAVE_WINDOWS -#include "windows.hpp" -#endif - -#include "err.hpp" -#include "pipe.hpp" -#include "mtrie.hpp" - -xs::mtrie_t::mtrie_t () : - pipes (0), - min (0), - count (0), - live_nodes (0) -{ -} - -xs::mtrie_t::~mtrie_t () -{ - if (pipes) { - delete pipes; - pipes = 0; - } - - if (count == 1) { - xs_assert (next.node); - delete next.node; - next.node = 0; - } - else if (count > 1) { - for (unsigned short i = 0; i != count; ++i) - if (next.table [i]) - delete next.table [i]; - free (next.table); - } -} - -bool xs::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return add_helper (prefix_, size_, pipe_); -} - -bool xs::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - // We are at the node corresponding to the prefix. We are done. - if (!size_) { - bool result = !pipes; - if (!pipes) - pipes = new pipes_t; - pipes->insert (pipe_); - return result; - } - - unsigned char c = *prefix_; - if (c < min || c >= min + count) { - - // The character is out of range of currently handled - // charcters. We have to extend the table. - if (!count) { - min = c; - count = 1; - next.node = NULL; - } - else if (count == 1) { - unsigned char oldc = min; - mtrie_t *oldp = next.node; - count = (min < c ? c - min : min - c) + 1; - next.table = (mtrie_t**) - malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - for (unsigned short i = 0; i != count; ++i) - next.table [i] = 0; - min = std::min (min, c); - next.table [oldc - min] = oldp; - } - else if (min < c) { - - // The new character is above the current character range. - unsigned short old_count = count; - count = c - min + 1; - next.table = (mtrie_t**) realloc ((void*) next.table, - sizeof (mtrie_t*) * count); - xs_assert (next.table); - for (unsigned short i = old_count; i != count; i++) - next.table [i] = NULL; - } - else { - - // The new character is below the current character range. - unsigned short old_count = count; - count = (min + old_count) - c; - next.table = (mtrie_t**) realloc ((void*) next.table, - sizeof (mtrie_t*) * count); - xs_assert (next.table); - memmove (next.table + min - c, next.table, - old_count * sizeof (mtrie_t*)); - for (unsigned short i = 0; i != min - c; i++) - next.table [i] = NULL; - min = c; - } - } - - // If next node does not exist, create one. - if (count == 1) { - if (!next.node) { - next.node = new (std::nothrow) mtrie_t; - ++live_nodes; - xs_assert (next.node); - } - return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_); - } - else { - if (!next.table [c - min]) { - next.table [c - min] = new (std::nothrow) mtrie_t; - ++live_nodes; - xs_assert (next.table [c - min]); - } - return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_); - } -} - - -void xs::mtrie_t::rm (pipe_t *pipe_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) -{ - unsigned char *buff = NULL; - rm_helper (pipe_, &buff, 0, 0, func_, arg_); - free (buff); -} - -void xs::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) -{ - // Remove the subscription from this node. - if (pipes && pipes->erase (pipe_) && pipes->empty ()) { - func_ (*buff_, buffsize_, arg_); - delete pipes; - pipes = 0; - } - - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); - alloc_assert (*buff_); - } - - // If there are no subnodes in the trie, return. - if (count == 0) - return; - - // If there's one subnode (optimisation). - if (count == 1) { - (*buff_) [buffsize_] = min; - buffsize_++; - next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, - func_, arg_); - - // Prune the node if it was made redundant by the removal - if (next.node->is_redundant ()) { - delete next.node; - next.node = 0; - count = 0; - --live_nodes; - xs_assert (live_nodes == 0); - } - return; - } - - // If there are multiple subnodes. - // - // New min non-null character in the node table after the removal - unsigned char new_min = min + count - 1; - // New max non-null character in the node table after the removal - unsigned char new_max = min; - for (unsigned short c = 0; c != count; c++) { - (*buff_) [buffsize_] = min + c; - if (next.table [c]) { - next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, - maxbuffsize_, func_, arg_); - - // Prune redundant nodes from the mtrie - if (next.table [c]->is_redundant ()) { - delete next.table [c]; - next.table [c] = 0; - - xs_assert (live_nodes > 0); - --live_nodes; - } - else { - // The node is not redundant, so it's a candidate for being - // the new min/max node. - // - // We loop through the node array from left to right, so the - // first non-null, non-redundant node encountered is the new - // minimum index. Conversely, the last non-redundant, non-null - // node encountered is the new maximum index. - if (c + min < new_min) - new_min = c + min; - if (c + min > new_max) - new_max = c + min; - } - } - } - - xs_assert (count > 1); - - // Compact the node table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - xs_assert (new_min == new_max); - xs_assert (new_min >= min && new_min < min + count); - mtrie_t *node = next.table [new_min - min]; - xs_assert (node); - free (next.table); - next.node = node; - count = 1; - min = new_min; - } - else if (live_nodes > 1 && (new_min > min || new_max < min + count - 1)) { - xs_assert (new_max - new_min + 1 > 1); - - mtrie_t **old_table = next.table; - xs_assert (new_min > min || new_max < min + count - 1); - xs_assert (new_min >= min); - xs_assert (new_max <= min + count - 1); - xs_assert (new_max - new_min + 1 < count); - - count = new_max - new_min + 1; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - |