From d82cbb3a81f116cd22e9895ecac36ac3d7b38929 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 5 Apr 2012 07:32:58 +0200 Subject: XS_PLUGIN and XS_FILTER implementation This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik --- src/Makefile.am | 8 +- src/core.cpp | 68 ++++++ src/core.hpp | 53 ++++ src/ctx.cpp | 39 +++ src/ctx.hpp | 14 +- src/mtrie.cpp | 436 --------------------------------- src/mtrie.hpp | 93 ------- src/object.cpp | 5 + src/object.hpp | 5 + src/options.cpp | 20 ++ src/options.hpp | 3 + src/prefix_filter.cpp | 664 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/prefix_filter.hpp | 108 ++++++++ src/sub.cpp | 19 +- src/trie.cpp | 338 ------------------------- src/trie.hpp | 79 ------ src/wire.hpp | 4 + src/xpub.cpp | 153 +++++++++--- src/xpub.hpp | 30 ++- src/xsub.cpp | 119 +++++++-- src/xsub.hpp | 29 ++- 21 files changed, 1267 insertions(+), 1020 deletions(-) create mode 100644 src/core.cpp create mode 100644 src/core.hpp delete mode 100644 src/mtrie.cpp delete mode 100644 src/mtrie.hpp create mode 100644 src/prefix_filter.cpp create mode 100644 src/prefix_filter.hpp delete mode 100644 src/trie.cpp delete mode 100644 src/trie.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index fdfa8d1..3c260be 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,6 +23,7 @@ libxs_la_SOURCES = \ clock.hpp \ command.hpp \ config.hpp \ + core.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ @@ -44,7 +45,6 @@ libxs_la_SOURCES = \ likely.hpp \ mailbox.hpp \ msg.hpp \ - mtrie.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -56,6 +56,7 @@ libxs_la_SOURCES = \ platform.hpp \ poll.hpp \ pair.hpp \ + prefix_filter.hpp \ pub.hpp \ pull.hpp \ push.hpp \ @@ -74,7 +75,6 @@ libxs_la_SOURCES = \ tcp_connecter.hpp \ tcp_listener.hpp \ thread.hpp \ - trie.hpp \ upoll.hpp \ windows.hpp \ wire.hpp \ @@ -85,6 +85,7 @@ libxs_la_SOURCES = \ ypipe.hpp \ yqueue.hpp \ clock.cpp \ + core.cpp \ ctx.cpp \ decoder.cpp \ devpoll.cpp \ @@ -103,7 +104,6 @@ libxs_la_SOURCES = \ lb.cpp \ mailbox.cpp \ msg.cpp \ - mtrie.cpp \ object.cpp \ options.cpp \ own.cpp \ @@ -113,6 +113,7 @@ libxs_la_SOURCES = \ pgm_socket.cpp \ pipe.cpp \ poll.cpp \ + prefix_filter.cpp \ pull.cpp \ push.cpp \ reaper.cpp \ @@ -130,7 +131,6 @@ libxs_la_SOURCES = \ tcp_connecter.cpp \ tcp_listener.cpp \ thread.cpp \ - trie.cpp \ upoll.cpp \ xpub.cpp \ xrep.cpp \ diff --git a/src/core.cpp b/src/core.cpp new file mode 100644 index 0000000..2d16167 --- /dev/null +++ b/src/core.cpp @@ -0,0 +1,68 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "../include/xs.h" + +#include "core.hpp" +#include "err.hpp" + +xs::core_t::core_t () +{ +} + +xs::core_t::~core_t () +{ +} + +int xs::core_t::filter_subscribed (const unsigned char *data_, size_t size_) +{ + errno = ENOTSUP; + return -1; +} + +int xs::core_t::filter_unsubscribed (const unsigned char *data_, size_t size_) +{ + errno = ENOTSUP; + return -1; +} + +int xs::core_t::filter_matching (void *subscriber_) +{ + errno = ENOTSUP; + return -1; +} + +int xs_filter_subscribed (void *core_, + const unsigned char *data_, size_t size_) +{ + return ((xs::core_t*) core_)->filter_subscribed (data_, size_); +} + +int xs_filter_unsubscribed (void *core_, + const unsigned char *data_, size_t size_) +{ + return ((xs::core_t*) core_)->filter_unsubscribed (data_, size_); +} + +int xs_filter_matching (void *core_, void *subscriber_) +{ + return ((xs::core_t*) core_)->filter_matching (subscriber_); +} + diff --git a/src/core.hpp b/src/core.hpp new file mode 100644 index 0000000..fb50646 --- /dev/null +++ b/src/core.hpp @@ -0,0 +1,53 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __XS_CORE_HPP_INCLUDED__ +#define __XS_CORE_HPP_INCLUDED__ + +#include + +namespace xs +{ + + // This class is not a core of Crossroads. It's rather a callback interface + // for extensions, ie. what's extensions see as Crossroads core. + + class core_t + { + public: + + core_t (); + virtual ~core_t (); + + virtual int filter_subscribed (const unsigned char *data_, + size_t size_); + virtual int filter_unsubscribed (const unsigned char *data_, + size_t size_); + virtual int filter_matching (void *subscriber_); + + private: + + core_t (const core_t&); + const core_t &operator = (const core_t&); + }; + +} + +#endif diff --git a/src/ctx.cpp b/src/ctx.cpp index 21dec58..46fa984 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" +#include "prefix_filter.hpp" xs::ctx_t::ctx_t () : tag (0xbadcafe0), @@ -46,6 +47,9 @@ xs::ctx_t::ctx_t () : max_sockets (512), io_thread_count (1) { + // Plug in the standard plugins. + int rc = plug (prefix_filter); + xs_assert (rc == 0); } bool xs::ctx_t::check_tag () @@ -124,6 +128,28 @@ int xs::ctx_t::terminate () return 0; } +int xs::ctx_t::plug (const void *ext_) +{ + if (!ext_) { + errno = EFAULT; + return -1; + } + + // The extension is a message filter plug-in. + xs_filter_t *filter = (xs_filter_t*) ext_; + if (filter->type == XS_PLUGIN_FILTER && filter->version == 1) { + opt_sync.lock (); + filters [filter->id (NULL)] = filter; + opt_sync.unlock (); + return 0; + } + + // Specified extension type is not supported by this version of + // the library. + errno = ENOTSUP; + return -1; +} + int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) { switch (option_) { @@ -145,6 +171,8 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) io_thread_count = *((int*) optval_); opt_sync.unlock (); break; + case XS_PLUGIN: + return plug (optval_); default: errno = EINVAL; return -1; @@ -256,6 +284,17 @@ xs::object_t *xs::ctx_t::get_reaper () return reaper; } +xs_filter_t *xs::ctx_t::get_filter (int filter_id_) +{ + xs_filter_t *result = NULL; + opt_sync.lock (); + filters_t::iterator it = filters.find (filter_id_); + if (it != filters.end ()) + result = it->second; + opt_sync.unlock (); + return result; +} + void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); diff --git a/src/ctx.hpp b/src/ctx.hpp index c4fa96a..54144ad 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -25,7 +25,8 @@ #include #include #include -#include + +#include "../include/xs.h" #include "mailbox.hpp" #include "array.hpp" @@ -88,6 +89,10 @@ namespace xs // Returns reaper thread object. xs::object_t *get_reaper (); + // Get the filter associated with the specified filter ID or NULL + // If such filter is not registered. + xs_filter_t *get_filter (int filter_id_); + // Management of inproc endpoints. int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (xs::socket_base_t *socket_); @@ -102,6 +107,9 @@ namespace xs private: + // Plug in the extension specified. + int plug (const void *ext_); + // Used to check whether the object is a context. uint32_t tag; @@ -161,6 +169,10 @@ namespace xs // Synchronisation of access to context options. mutex_t opt_sync; + // List of all filters plugged into the context. + typedef std::map filters_t; + filters_t filters; + ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; diff --git a/src/mtrie.cpp b/src/mtrie.cpp deleted file mode 100644 index eae34c2..0000000 --- a/src/mtrie.cpp +++ /dev/null @@ -1,436 +0,0 @@ -/* - Copyright (c) 2011-2012 250bpm s.r.o. - Copyright (c) 2011-2012 Spotify AB - Copyright (c) 2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include - -#include -#include - -#include "platform.hpp" -#if defined XS_HAVE_WINDOWS -#include "windows.hpp" -#endif - -#include "err.hpp" -#include "pipe.hpp" -#include "mtrie.hpp" - -xs::mtrie_t::mtrie_t () : - pipes (0), - min (0), - count (0), - live_nodes (0) -{ -} - -xs::mtrie_t::~mtrie_t () -{ - if (pipes) { - delete pipes; - pipes = 0; - } - - if (count == 1) { - xs_assert (next.node); - delete next.node; - next.node = 0; - } - else if (count > 1) { - for (unsigned short i = 0; i != count; ++i) - if (next.table [i]) - delete next.table [i]; - free (next.table); - } -} - -bool xs::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return add_helper (prefix_, size_, pipe_); -} - -bool xs::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - // We are at the node corresponding to the prefix. We are done. - if (!size_) { - bool result = !pipes; - if (!pipes) - pipes = new pipes_t; - pipes->insert (pipe_); - return result; - } - - unsigned char c = *prefix_; - if (c < min || c >= min + count) { - - // The character is out of range of currently handled - // charcters. We have to extend the table. - if (!count) { - min = c; - count = 1; - next.node = NULL; - } - else if (count == 1) { - unsigned char oldc = min; - mtrie_t *oldp = next.node; - count = (min < c ? c - min : min - c) + 1; - next.table = (mtrie_t**) - malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - for (unsigned short i = 0; i != count; ++i) - next.table [i] = 0; - min = std::min (min, c); - next.table [oldc - min] = oldp; - } - else if (min < c) { - - // The new character is above the current character range. - unsigned short old_count = count; - count = c - min + 1; - next.table = (mtrie_t**) realloc ((void*) next.table, - sizeof (mtrie_t*) * count); - xs_assert (next.table); - for (unsigned short i = old_count; i != count; i++) - next.table [i] = NULL; - } - else { - - // The new character is below the current character range. - unsigned short old_count = count; - count = (min + old_count) - c; - next.table = (mtrie_t**) realloc ((void*) next.table, - sizeof (mtrie_t*) * count); - xs_assert (next.table); - memmove (next.table + min - c, next.table, - old_count * sizeof (mtrie_t*)); - for (unsigned short i = 0; i != min - c; i++) - next.table [i] = NULL; - min = c; - } - } - - // If next node does not exist, create one. - if (count == 1) { - if (!next.node) { - next.node = new (std::nothrow) mtrie_t; - ++live_nodes; - xs_assert (next.node); - } - return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_); - } - else { - if (!next.table [c - min]) { - next.table [c - min] = new (std::nothrow) mtrie_t; - ++live_nodes; - xs_assert (next.table [c - min]); - } - return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_); - } -} - - -void xs::mtrie_t::rm (pipe_t *pipe_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) -{ - unsigned char *buff = NULL; - rm_helper (pipe_, &buff, 0, 0, func_, arg_); - free (buff); -} - -void xs::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) -{ - // Remove the subscription from this node. - if (pipes && pipes->erase (pipe_) && pipes->empty ()) { - func_ (*buff_, buffsize_, arg_); - delete pipes; - pipes = 0; - } - - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); - alloc_assert (*buff_); - } - - // If there are no subnodes in the trie, return. - if (count == 0) - return; - - // If there's one subnode (optimisation). - if (count == 1) { - (*buff_) [buffsize_] = min; - buffsize_++; - next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, - func_, arg_); - - // Prune the node if it was made redundant by the removal - if (next.node->is_redundant ()) { - delete next.node; - next.node = 0; - count = 0; - --live_nodes; - xs_assert (live_nodes == 0); - } - return; - } - - // If there are multiple subnodes. - // - // New min non-null character in the node table after the removal - unsigned char new_min = min + count - 1; - // New max non-null character in the node table after the removal - unsigned char new_max = min; - for (unsigned short c = 0; c != count; c++) { - (*buff_) [buffsize_] = min + c; - if (next.table [c]) { - next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, - maxbuffsize_, func_, arg_); - - // Prune redundant nodes from the mtrie - if (next.table [c]->is_redundant ()) { - delete next.table [c]; - next.table [c] = 0; - - xs_assert (live_nodes > 0); - --live_nodes; - } - else { - // The node is not redundant, so it's a candidate for being - // the new min/max node. - // - // We loop through the node array from left to right, so the - // first non-null, non-redundant node encountered is the new - // minimum index. Conversely, the last non-redundant, non-null - // node encountered is the new maximum index. - if (c + min < new_min) - new_min = c + min; - if (c + min > new_max) - new_max = c + min; - } - } - } - - xs_assert (count > 1); - - // Compact the node table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - xs_assert (new_min == new_max); - xs_assert (new_min >= min && new_min < min + count); - mtrie_t *node = next.table [new_min - min]; - xs_assert (node); - free (next.table); - next.node = node; - count = 1; - min = new_min; - } - else if (live_nodes > 1 && (new_min > min || new_max < min + count - 1)) { - xs_assert (new_max - new_min + 1 > 1); - - mtrie_t **old_table = next.table; - xs_assert (new_min > min || new_max < min + count - 1); - xs_assert (new_min >= min); - xs_assert (new_max <= min + count - 1); - xs_assert (new_max - new_min + 1 < count); - - count = new_max - new_min + 1; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } -} - -bool xs::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return rm_helper (prefix_, size_, pipe_); -} - -bool xs::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - if (!size_) { - if (pipes) { - pipes_t::size_type erased = pipes->erase (pipe_); - xs_assert (erased == 1); - if (pipes->empty ()) { - delete pipes; - pipes = 0; - } - } - return !pipes; - } - - unsigned char c = *prefix_; - if (!count || c < min || c >= min + count) - return false; - - mtrie_t *next_node = - count == 1 ? next.node : next.table [c - min]; - - if (!next_node) - return false; - - bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); - - if (next_node->is_redundant ()) { - delete next_node; - xs_assert (count > 0); - - if (count == 1) { - next.node = 0; - count = 0; - --live_nodes; - xs_assert (live_nodes == 0); - } - else { - next.table [c - min] = 0; - xs_assert (live_nodes > 1); - --live_nodes; - - // Compact the table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - mtrie_t *node = 0; - for (unsigned short i = 0; i < count; ++i) { - if (next.table [i]) { - node = next.table [i]; - min = i + min; - break; - } - } - - xs_assert (node); - free (next.table); - next.node = node; - count = 1; - } - else if (c == min) { - // We can compact the table "from the left" - unsigned char new_min = min; - for (unsigned short i = 1; i < count; ++i) { - if (next.table [i]) { - new_min = i + min; - break; - } - } - xs_assert (new_min != min); - - mtrie_t **old_table = next.table; - xs_assert (new_min > min); - xs_assert (count > new_min - min); - - count = count - (new_min - min); - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } - else if (c == min + count - 1) { - // We can compact the table "from the right" - unsigned short new_count = count; - for (unsigned short i = 1; i < count; ++i) { - if (next.table [count - 1 - i]) { - new_count = count - i; - break; - } - } - xs_assert (new_count != count); - count = new_count; - - mtrie_t **old_table = next.table; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table, sizeof (mtrie_t*) * count); - free (old_table); - } - } - } - - return ret; -} - -void xs::mtrie_t::match (unsigned char *data_, size_t size_, - void (*func_) (pipe_t *pipe_, void *arg_), void *arg_) -{ - mtrie_t *current = this; - while (true) { - - // Signal the pipes attached to this node. - if (current->pipes) { - for (pipes_t::iterator it = current->pipes->begin (); - it != current->pipes->end (); ++it) - func_ (*it, arg_); - } - - // If we are at the end of the message, there's nothing more to match. - if (!size_) - break; - - // If there are no subnodes in the trie, return. - if (current->count == 0) - break; - - // If there's one subnode (optimisation). - if (current->count == 1) { - if (data_ [0] != current->min) - break; - current = current->next.node; - data_++; - size_--; - continue; - } - - // If there are multiple subnodes. - if (data_ [0] < current->min || data_ [0] >= - current->min + current->count) - break; - if (!current->next.table [data_ [0] - current->min]) - break; - current = current->next.table [data_ [0] - current->min]; - data_++; - size_--; - } -} - -bool xs::mtrie_t::is_redundant () const -{ - return !pipes && live_nodes == 0; -} - diff --git a/src/mtrie.hpp b/src/mtrie.hpp deleted file mode 100644 index 1e56b4e..0000000 --- a/src/mtrie.hpp +++ /dev/null @@ -1,93 +0,0 @@ -/* - Copyright (c) 2011-2012 250bpm s.r.o. - Copyright (c) 2011-2012 Spotify AB - Copyright (c) 2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __XS_MTRIE_HPP_INCLUDED__ -#define __XS_MTRIE_HPP_INCLUDED__ - -#include -#include - -#include "stdint.hpp" - -namespace xs -{ - - class pipe_t; - - // Multi-trie. Each node in the trie is a set of pointers to pipes. - - class mtrie_t - { - public: - - mtrie_t (); - ~mtrie_t (); - - // Add key to the trie. Returns true if it's a new subscription - // rather than a duplicate. - bool add (unsigned char *prefix_, size_t size_, xs::pipe_t *pipe_); - - // Remove all subscriptions for a specific peer from the trie. - // If there are no subscriptions left on some topics, invoke the - // supplied callback function. - void rm (xs::pipe_t *pipe_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); - - // Remove specific subscription from the trie. Return true is it was - // actually removed rather than de-duplicated. - bool rm (unsigned char *prefix_, size_t size_, xs::pipe_t *pipe_); - - // Signal all the matching pipes. - void match (unsigned char *data_, size_t size_, - void (*func_) (xs::pipe_t *pipe_, void *arg_), void *arg_); - - private: - - bool add_helper (unsigned char *prefix_, size_t size_, - xs::pipe_t *pipe_); - void rm_helper (xs::pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); - bool rm_helper (unsigned char *prefix_, size_t size_, - xs::pipe_t *pipe_); - bool is_redundant () const; - - typedef std::set pipes_t; - pipes_t *pipes; - - unsigned char min; - unsigned short count; - unsigned short live_nodes; - union { - class mtrie_t *node; - class mtrie_t **table; - } next; - - mtrie_t (const mtrie_t&); - const mtrie_t &operator = (const mtrie_t&); - }; - -} - -#endif - diff --git a/src/object.cpp b/src/object.cpp index 5c1ed84..4f04fe4 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -154,6 +154,11 @@ xs::io_thread_t *xs::object_t::choose_io_thread (uint64_t affinity_) return ctx->choose_io_thread (affinity_); } +xs_filter_t *xs::object_t::get_filter (int filter_id_) +{ + return ctx->get_filter (filter_id_); +} + void xs::object_t::send_stop () { // 'stop' command goes always from administrative thread to diff --git a/src/object.hpp b/src/object.hpp index 5b855a5..fabb156 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -22,6 +22,8 @@ #ifndef __XS_OBJECT_HPP_INCLUDED__ #define __XS_OBJECT_HPP_INCLUDED__ +#include "../include/xs.h" + #include "stdint.hpp" namespace xs @@ -64,6 +66,9 @@ namespace xs // Chooses least loaded I/O thread. xs::io_thread_t *choose_io_thread (uint64_t affinity_); + // Functions related to extensions. + xs_filter_t *get_filter (int filter_id_); + // Derived object can use these functions to send commands // to other objects. void send_stop (); diff --git a/src/options.cpp b/src/options.cpp index c9cbaae..fdbffde 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,6 +23,8 @@ #include #include +#include "../include/xs.h" + #include "options.hpp" #include "err.hpp" @@ -47,6 +49,7 @@ xs::options_t::options_t () : ipv4only (1), keepalive (0), protocol (0), + filter_id (XS_FILTER_PREFIX), delay_on_close (true), delay_on_disconnect (true), filter (false), @@ -248,6 +251,14 @@ int xs::options_t::setsockopt (int option_, const void *optval_, return 0; } + case XS_FILTER: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + filter_id = *((int*) optval_); + return 0; + } errno = EINVAL; @@ -438,6 +449,15 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case XS_FILTER: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = filter_id; + *optvallen_ = sizeof (int); + return 0; + } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index c1e4dda..1288f72 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -95,6 +95,9 @@ namespace xs // Version of wire protocol to use. int protocol; + // Filter ID to be used with subscriptions and unsubscriptions. + int filter_id; + // If true, session reads all the pending messages from the pipe and // sends them to the network when socket is closed. bool delay_on_close; diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp new file mode 100644 index 0000000..5d21fb6 --- /dev/null +++ b/src/prefix_filter.cpp @@ -0,0 +1,664 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2011-2012 Spotify AB + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include + +#include "../include/xs.h" + +#include "prefix_filter.hpp" +#include "err.hpp" + +xs::prefix_filter_t::prefix_filter_t () +{ + init (&root); +} + +xs::prefix_filter_t::~prefix_filter_t () +{ + close (&root); +} + +int xs::prefix_filter_t::subscribe (void *core_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return add (&root, data_, size_, subscriber_) ? 1 : 0; +} + +int xs::prefix_filter_t::unsubscribe (void *core_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return rm (&root, data_, size_, subscriber_) ? 1 : 0; +} + +void xs::prefix_filter_t::unsubscribe_all (void *core_, void *subscriber_) +{ + rm (&root, subscriber_, core_); +} + +void xs::prefix_filter_t::enumerate (void *core_) +{ + unsigned char *buff = NULL; + list (&root, &buff, 0, 0, core_); + free (buff); +} + +int xs::prefix_filter_t::match (void *core_, + const unsigned char *data_, size_t size_) +{ + // This function is on critical path. It deliberately doesn't use + // recursion to get a bit better performance. + node_t *current = &root; + while (true) { + + // We've found a corresponding subscription! + if (current->subscribers) + return 1; + + // We've checked all the data and haven't found matching subscription. + if (!size_) + return 0; + + // If there's no corresponding slot for the first character + // of the prefix, the message does not match. + unsigned char c = *data_; + if (c < current->min || c >= current->min + current->count) + return 0; + + // Move to the next character. + if (current->count == 1) + current = current->next.node; + else { + current = current->next.table [c - current->min]; + if (!current) + return 0; + } + data_++; + size_--; + } + +} + +void xs::prefix_filter_t::match_all (void *core_, + const unsigned char *data_, size_t size_) +{ + node_t *current = &root; + while (true) { + + // Signal the subscribers attached to this node. + if (current->subscribers) { + for (node_t::subscribers_t::iterator it = + current->subscribers->begin (); + it != current->subscribers->end (); ++it) { + int rc = xs_filter_matching (core_, it->first); + errno_assert (rc == 0); + } + } + + // If we are at the end of the message, there's nothing more to match. + if (!size_) + break; + + // If there are no subnodes in the trie, return. + if (current->count == 0) + break; + + // If there's one subnode (optimisation). + if (current->count == 1) { + if (data_ [0] != current->min) + break; + current = current->next.node; + data_++; + size_--; + continue; + } + + // If there are multiple subnodes. + if (data_ [0] < current->min || data_ [0] >= + current->min + current->count) + break; + if (!current->next.table [data_ [0] - current->min]) + break; + current = current->next.table [data_ [0] - current->min]; + data_++; + size_--; + } +} + +void xs::prefix_filter_t::init (node_t *node_) +{ + node_->subscribers = NULL; + node_->min = 0; + node_->count = 0; + node_->live_nodes = 0; +} + +void xs::prefix_filter_t::close (node_t *node_) +{ + if (node_->subscribers) { + delete node_->subscribers; + node_->subscribers = NULL; + } + + if (node_->count == 1) { + xs_assert (node_->next.node); + close (node_->next.node); + delete node_->next.node; + node_->next.node = NULL; + } + else if (node_->count > 1) { + for (unsigned short i = 0; i != node_->count; ++i) + if (node_->next.table [i]) { + close (node_->next.table [i]); + delete node_->next.table [i]; + } + free (node_->next.table); + } +} + +bool xs::prefix_filter_t::add (node_t *node_, const unsigned char *prefix_, + size_t size_, void *subscriber_) +{ + // We are at the node corresponding to the prefix. We are done. + if (!size_) { + bool result = !node_->subscribers; + if (!node_->subscribers) + node_->subscribers = new (std::nothrow) node_t::subscribers_t; + node_t::subscribers_t::iterator it = node_->subscribers->insert ( + node_t::subscribers_t::value_type (subscriber_, 0)).first; + ++it->second; + return result; + } + + unsigned char c = *prefix_; + if (c < node_->min || c >= node_->min + node_->count) { + + // The character is out of range of currently handled + // charcters. We have to extend the table. + if (!node_->count) { + node_->min = c; + node_->count = 1; + node_->next.node = NULL; + } + else if (node_->count == 1) { + unsigned char oldc = node_->min; + node_t *oldp = node_->next.node; + node_->count = + (node_->min < c ? c - node_->min : node_->min - c) + 1; + node_->next.table = (node_t**) + malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + for (unsigned short i = 0; i != node_->count; ++i) + node_->next.table [i] = 0; + node_->min = std::min (node_->min, c); + node_->next.table [oldc - node_->min] = oldp; + } + else if (node_->min < c) { + + // The new character is above the current character range. + unsigned short old_count = node_->count; + node_->count = c - node_->min + 1; + node_->next.table = (node_t**) realloc ((void*) node_->next.table, + sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + for (unsigned short i = old_count; i != node_->count; i++) + node_->next.table [i] = NULL; + } + else { + + // The new character is below the current character range. + unsigned short old_count = node_->count; + node_->count = (node_->min + old_count) - c; + node_->next.table = (node_t**) realloc ((void*) node_->next.table, + sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + memmove (node_->next.table + node_->min - c, node_->next.table, + old_count * sizeof (node_t*)); + for (unsigned short i = 0; i != node_->min - c; i++) + node_->next.table [i] = NULL; + node_->min = c; + } + } + + // If next node does not exist, create one. + if (node_->count == 1) { + if (!node_->next.node) { + node_->next.node = new (std::nothrow) node_t; + alloc_assert (node_->next.node); + init (node_->next.node); + ++node_->live_nodes; + xs_assert (node_->next.node); + } + return add (node_->next.node, prefix_ + 1, size_ - 1, subscriber_); + } + else { + if (!node_->next.table [c - node_->min]) { + node_->next.table [c - node_->min] = new (std::nothrow) node_t; + alloc_assert (node_->next.table [c - node_->min]); + init (node_->next.table [c - node_->min]); + ++node_->live_nodes; + xs_assert (node_->next.table [c - node_->min]); + } + return add (node_->next.table [c - node_->min], prefix_ + 1, size_ - 1, + subscriber_); + } +} + + +void xs::prefix_filter_t::rm (node_t *node_, void *subscriber_, void *arg_) +{ + unsigned char *buff = NULL; + rm_helper (node_, subscriber_, &buff, 0, 0, arg_); + free (buff); +} + +void xs::prefix_filter_t::rm_helper (node_t *node_, void *subscribers_, + unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, void *arg_) +{ + // Remove the subscription from this node. + if (node_->subscribers) { + node_t::subscribers_t::iterator it = + node_->subscribers->find (subscribers_); + if (it != node_->subscribers->end ()) { + xs_assert (it->second); + --it->second; + if (!it->second) { + node_->subscribers->erase (it); + if (node_->subscribers->empty ()) { + int rc = xs_filter_unsubscribed (arg_, *buff_, buffsize_); + errno_assert (rc == 0); + delete node_->subscribers; + node_->subscribers = 0; + } + } + } + } + + // Adjust the buffer. + if (buffsize_ >= maxbuffsize_) { + maxbuffsize_ = buffsize_ + 256; + *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); + alloc_assert (*buff_); + } + + // If there are no subnodes in the trie, return. + if (node_->count == 0) + return; + + // If there's one subnode (optimisation). + if (node_->count == 1) { + (*buff_) [buffsize_] = node_->min; + buffsize_++; + rm_helper (node_->next.node, subscribers_, buff_, buffsize_, + maxbuffsize_, arg_); + + // Prune the node if it was made redundant by the removal + if (is_redundant (node_->next.node)) { + close (node_->next.node); + delete node_->next.node; + node_->next.node = 0; + node_->count = 0; + --node_->live_nodes; + xs_assert (node_->live_nodes == 0); + } + return; + } + + // If there are multiple subnodes. + // + // New min non-null character in the node table after the removal. + unsigned char new_min = node_->min; + // New max non-null character in the node table after the removal. + unsigned char new_max = node_->min + node_->count - 1; + for (unsigned short c = 0; c != node_->count; c++) { + (*buff_) [buffsize_] = node_->min + c; + if (node_->next.table [c]) { + rm_helper (node_->next.table [c], subscribers_, buff_, + buffsize_ + 1, maxbuffsize_, arg_); + + // Prune redudant nodes from the the trie. + if (is_redundant (node_->next.table [c])) { + close (node_->next.table [c]); + delete node_->next.table [c]; + node_->next.table [c] = 0; + + xs_assert (node_->live_nodes > 0); + --node_->live_nodes; + } + else { + if (c + node_->min > new_min) + new_min = c + node_->min; + if (c + node_->min < new_max) + new_max = c + node_->min; + } + } + } + + xs_assert (node_->count > 1); + + // Compact the node table if possible. + if (node_->live_nodes == 1) { + + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation. + xs_assert (new_min == new_max); + xs_assert (new_min >= node_->min && + new_min < node_->min + node_->count); + node_t *node = node_->next.table [new_min - node_->min]; + xs_assert (node); + free (node_->next.table); + node_->next.node = node; + node_->count = 1; + node_->min = new_min; + } + else if (node_->live_nodes > 1 && + (new_min > node_->min || new_max < node_->min + node_->count - 1)) { + xs_assert (new_max - new_min + 1 > 1); + + node_t **old_table = node_->next.table; + xs_assert (new_min > node_->min || + new_max < node_->min + node_->count - 1); + xs_assert (new_min >= node_->min); + xs_assert (new_max <= node_->min + node_->count - 1); + xs_assert (new_max - new_min + 1 < node_->count); + + node_->count = new_max - new_min + 1; + node_->next.table = (node_t**) malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + + memmove (node_->next.table, old_table + (new_min - node_->min), + sizeof (node_t*) * node_->count); + free (old_table); + + node_->min = new_min; + } +} + +bool xs::prefix_filter_t::rm (node_t *node_, const unsigned char *prefix_, + size_t size_, void *subscriber_) +{ + if (!size_) { + + // Remove the subscription from this node. + if (node_->subscribers) { + node_t::subscribers_t::iterator it = + node_->subscribers->find (subscriber_); + if (it != node_->subscribers->end ()) { + xs_assert (it->second); + --it->second; + if (!it->second) { + node_->subscribers->erase (it); + if (node_->subscribers->empty ()) { + delete node_->subscribers; + node_->subscribers = 0; + } + } + } + } + return !node_->subscribers; + } + + unsigned char c = *prefix_; + if (!node_->count || c < node_->min || c >= node_->min + node_->count) + return false; + + node_t *next_node = node_->count == 1 ? node_->next.node : + node_->next.table [c - node_->min]; + + if (!next_node) + return false; + + bool ret = rm (next_node, prefix_ + 1, size_ - 1, subscriber_); + + if (is_redundant (next_node)) { + close (next_node); + delete next_node; + xs_assert (node_->count > 0); + + if (node_->count == 1) { + node_->next.node = 0; + node_->count = 0; + --node_->live_nodes; + xs_assert (node_->live_nodes == 0); + } + else { + node_->next.table [c - node_->min] = 0; + xs_assert (node_->live_nodes > 1); + --node_->live_nodes; + + // Compact the table if possible. + if (node_->live_nodes == 1) { + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + node_t *node = 0; + for (unsigned short i = 0; i < node_->count; ++i) { + if (node_->next.table [i]) { + node = node_->next.table [i]; + node_->min += i; + break; + } + } + + xs_assert (node); + free (node_->next.table); + node_->next.node = node; + node_->count = 1; + } + else if (c == node_->min) { + + // We can compact the table "from the left". + unsigned char new_min = node_->min; + for (unsigned short i = 1; i < node_->count; ++i) { + if (node_->next.table [i]) { + new_min = i + node_->min; + break; + } + } + xs_assert (new_min != node_->min); + + node_t **old_table = node_->next.table; + xs_assert (new_min > node_->min); + xs_assert (node_->count > new_min - node_->min); + + node_->count = node_->count - (new_min - node_->min); + node_->next.table = + (node_t**) malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + + memmove (node_->next.table, old_table + (new_min - node_->min), + sizeof (node_t*) * node_->count); + free (old_table); + + node_->min = new_min; + } + else if (c == node_->min + node_->count - 1) { + + // We can compact the table "from the right". + unsigned short new_count = node_->count; + for (unsigned short i = 1; i < node_->count; ++i) { + if (node_->next.table [node_->count - 1 - i]) { + new_count = node_->count - i; + break; + } + } + xs_assert (new_count != node_->count); + node_->count = new_count; + + node_t **old_table = node_->next.table; + node_->next.table = + (node_t**) malloc (sizeof (node_t*) * node_->count); + xs_assert (node_->next.table); + + memmove (node_->next.table, old_table, + sizeof (node_t*) * node_->count); + free (old_table); + } + } + } + + return ret; +} + +void xs::prefix_filter_t::list (node_t *node_, unsigned char **buff_, + size_t buffsize_, size_t maxbuffsize_, void *arg_) +{ + // If this node is a subscription, apply the function. + if (node_->subscribers) { + int rc = xs_filter_subscribed (arg_, *buff_, buffsize_); + errno_assert (rc == 0); + } + + // Adjust the buffer. + if (buffsize_ >= maxbuffsize_) { + maxbuffsize_ = buffsize_ + 256; + *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); + xs_assert (*buff_); + } + + // If there are no subnodes in the trie, return. + if (node_->count == 0) + return; + + // If there's one subnode (optimisation). + if (node_->count == 1) { + (*buff_) [buffsize_] = node_->min; + buffsize_++; + list (node_->next.node, buff_, buffsize_, maxbuffsize_, arg_); + return; + } + + // If there are multiple subnodes. + for (unsigned short c = 0; c != node_->count; c++) { + (*buff_) [buffsize_] = node_->min + c; + if (node_->next.table [c]) + list (node_->next.table [c], buff_, buffsize_ + 1, maxbuffsize_, + arg_); + } +} + +bool xs::prefix_filter_t::is_redundant (node_t *node_) +{ + return !node_->subscribers && node_->live_nodes == 0; +} + +// Implementation of the C interface of the filter. +// Following functions convert raw C calls into calls to C++ object methods. + +static int id (void *core_) +{ + return XS_FILTER_PREFIX; +} + +static void *pf_create (void *core_) +{ + void *pf = (void*) new (std::nothrow) xs::prefix_filter_t; + alloc_assert (pf); + return pf; +} + +static void pf_destroy (void *core_, void *pf_) +{ + delete (xs::prefix_filter_t*) pf_; +} + +static int pf_subscribe (void *core_, void *pf_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) pf_)->subscribe (core_, subscriber_, + data_, size_); +} + +static int pf_unsubscribe (void *core_, void *pf_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) pf_)->unsubscribe (core_, subscriber_, + data_, size_); +} + +static void pf_unsubscribe_all (void *core_, void *pf_, void *subscriber_) +{ + ((xs::prefix_filter_t*) pf_)->unsubscribe_all (core_, subscriber_); +} + +static void pf_match (void *core_, void *pf_, + const unsigned char *data_, size_t size_) +{ + ((xs::prefix_filter_t*) pf_)->match_all (core_, data_, size_); +} + +static void *sf_create (void *core_) +{ + void *sf = (void*) new (std::nothrow) xs::prefix_filter_t; + alloc_assert (sf); + return sf; +} + +static void sf_destroy (void *core_, void *sf_) +{ + delete (xs::prefix_filter_t*) sf_; +} + +static int sf_subscribe (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) sf_)->subscribe (core_, NULL, + data_, size_); +} + +static int sf_unsubscribe (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) sf_)->unsubscribe (core_, NULL, + data_, size_); +} + +static void sf_enumerate (void *core_, void *sf_) +{ + ((xs::prefix_filter_t*) sf_)->enumerate (core_); +} + +static int sf_match (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return ((xs::prefix_filter_t*) sf_)->match (core_, data_, size_); +} + +static xs_filter_t filter = { + XS_PLUGIN_FILTER, + 1, + id, + pf_create, + pf_destroy, + pf_subscribe, + pf_unsubscribe, + pf_unsubscribe_all, + pf_match, + sf_create, + sf_destroy, + sf_subscribe, + sf_unsubscribe, + sf_enumerate, + sf_match, +}; + +void *xs::prefix_filter = (void*) &filter; + diff --git a/src/prefix_filter.hpp b/src/prefix_filter.hpp new file mode 100644 index 0000000..0faa865 --- /dev/null +++ b/src/prefix_filter.hpp @@ -0,0 +1,108 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2011-2012 Spotify AB + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __XS_PREFIX_FILTER_HPP_INCLUDED__ +#define __XS_PREFIX_FILTER_HPP_INCLUDED__ + +#include +#include + +#include "stdint.hpp" + +namespace xs +{ + + // Canonical extension object. + extern void *prefix_filter; + + class prefix_filter_t + { + public: + + prefix_filter_t (); + ~prefix_filter_t (); + + int subscribe (void *core_, void *subscriber_, + const unsigned char *data_, size_t size_); + int unsubscribe (void *core_, void *subscriber_, + const unsigned char *data_, size_t size_); + void unsubscribe_all (void *core_, void *subscriber_); + void enumerate (void *core_); + int match (void *core_, const unsigned char *data_, size_t size_); + void match_all (void *core_, const unsigned char *data_, size_t size_); + + private: + + struct node_t + { + // Pointer to particular subscriber associated with + // the reference count. + typedef std::map subscribers_t; + subscribers_t *subscribers; + + unsigned char min; + unsigned short count; + unsigned short live_nodes; + union { + struct node_t *node; + struct node_t **table; + } next; + + }; + + static void init (node_t *node_); + static void close (node_t *node_); + + // Add key to the trie. Returns true if it's a new subscription + // rather than a duplicate. + static bool add (node_t *node_, const unsigned char *prefix_, + size_t size_, void *subscriber_); + + // Remove specific subscription from the trie. Return true is it + // was actually removed rather than de-duplicated. + static bool rm (node_t *node_, const unsigned char *prefix_, + size_t size_, void *subscriber_); + + // Remove all subscriptions for a specific peer from the trie. + // If there are no subscriptions left on some topics, invoke the + // supplied callback function. + static void rm (node_t *node_, void *subscriber_, void *arg_); + + static void rm_helper (node_t *node_, void *subscriber_, + unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, + void *arg_); + + // Lists all the subscriptions in the trie. + static void list (node_t *node_, unsigned char **buff_, + size_t buffsize_, size_t maxbuffsize_, void *arg_); + + // Checks whether node can be safely removed. + static bool is_redundant (node_t *node_); + + node_t root; + + prefix_filter_t (const prefix_filter_t&); + const prefix_filter_t &operator = (const prefix_filter_t&); + }; + +} + +#endif diff --git a/src/sub.cpp b/src/sub.cpp index 3065345..442dd9b 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -21,6 +21,7 @@ #include "sub.hpp" #include "msg.hpp" +#include "wire.hpp" xs::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : xsub_t (parent_, tid_, sid_) @@ -51,14 +52,28 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_, // Create the subscription message. msg_t msg; + int rc = msg.init_size (optvallen_ + 4); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) msg.data (); + if (option_ == XS_SUBSCRIBE) + put_uint16 (data, XS_CMD_SUBSCRIBE); + else if (option_ == XS_UNSUBSCRIBE) + put_uint16 (data, XS_CMD_UNSUBSCRIBE); + put_uint16 (data + 2, options.filter_id); + memcpy (data + 4, optval_, optvallen_); + +#if 0 + // TODO: This is 0MQ/3.1 protocol. + msg_t msg; int rc = msg.init_size (optvallen_ + 1); errno_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); if (option_ == XS_SUBSCRIBE) - *data = 1; + data [0] = 1; else if (option_ == XS_UNSUBSCRIBE) - *data = 0; + data [0] = 0; memcpy (data + 1, optval_, optvallen_); +#endif // Pass it further on in the stack. int err = 0; diff --git a/src/trie.cpp b/src/trie.cpp deleted file mode 100644 index af6cdd9..0000000 --- a/src/trie.cpp +++ /dev/null @@ -1,338 +0,0 @@ -/* - Copyright (c) 2009-2012 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation - Copyright (c) 2011-2012 Spotify AB - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include - -#include -#include - -#include "platform.hpp" -#if defined XS_HAVE_WINDOWS -#include "windows.hpp" -#endif - -#include "err.hpp" -#include "trie.hpp" - -xs::trie_t::trie_t () : - refcnt (0), - min (0), - count (0), - live_nodes (0) -{ -} - -xs::trie_t::~trie_t () -{ - if (count == 1) { - xs_assert (next.node); - delete next.node; - next.node = 0; - } - else if (count > 1) { - for (unsigned short i = 0; i != count; ++i) - if (next.table [i]) - delete next.table [i]; - free (next.table); - } -} - -bool xs::trie_t::add (unsigned char *prefix_, size_t size_) -{ - // We are at the node corresponding to the prefix. We are done. - if (!size_) { - ++refcnt; - return refcnt == 1; - } - - unsigned char c = *prefix_; - if (c < min || c >= min + count) { - - // The character is out of range of currently handled - // charcters. We have to extend the table. - if (!count) { - min = c; - count = 1; - next.node = NULL; - } - else if (count == 1) { - unsigned char oldc = min; - trie_t *oldp = next.node; - count = (min < c ? c - min : min - c) + 1; - next.table = (trie_t**) - malloc (sizeof (trie_t*) * count); - xs_assert (next.table); - for (unsigned short i = 0; i != count; ++i) - next.table [i] = 0; - min = std::min (min, c); - next.table [oldc - min] = oldp; - } - else if (min < c) { - - // The new character is above the current character range. - unsigned short old_count = count; - count = c - min + 1; - next.table = (trie_t**) realloc ((void*) next.table, - sizeof (trie_t*) * count); - xs_assert (next.table); - for (unsigned short i = old_count; i != count; i++) - next.table [i] = NULL; - } - else { - - // The new character is below the current character range. - unsigned short old_count = count; - count = (min + old_count) - c; - next.table = (trie_t**) realloc ((void*) next.table, - sizeof (trie_t*) * count); - xs_assert (next.table); - memmove (next.table + min - c, next.table, - old_count * sizeof (trie_t*)); - for (unsigned short i = 0; i != min - c; i++) - next.table [i] = NULL; - min = c; - } - } - - // If next node does not exist, create one. - if (count == 1) { - if (!next.node) { - next.node = new (std::nothrow) trie_t; - xs_assert (next.node); - ++live_nodes; - xs_assert (live_nodes == 1); - } - return next.node->add (prefix_ + 1, size_ - 1); - } - else { - if (!next.table [c - min]) { - next.table [c - min] = new (std::nothrow) trie_t; - xs_assert (next.table [c - min]); - ++live_nodes; - xs_assert (live_nodes > 1); - } - return next.table [c - min]->add (prefix_ + 1, size_ - 1); - } -} - -bool xs::trie_t::rm (unsigned char *prefix_, size_t size_) -{ - // TODO: Shouldn't an error be reported if the key does not exist? - - if (!size_) { - if (!refcnt) - return false; - refcnt--; - return refcnt == 0; - } - - unsigned char c = *prefix_; - if (!count || c < min || c >= min + count) - return false; - - trie_t *next_node = - count == 1 ? next.node : next.table [c - min]; - - if (!next_node) - return false; - - bool ret = next_node->rm (prefix_ + 1, size_ - 1); - - // Prune redundant nodes - if (next_node->is_redundant ()) { - delete next_node; - xs_assert (count > 0); - - if (count == 1) { - // The just pruned node is was the only live node - next.node = 0; - count = 0; - --live_nodes; - xs_assert (live_nodes == 0); - } - else { - next.table [c - min] = 0; - xs_assert (live_nodes > 1); - --live_nodes; - - // Compact the table if possible - if (live_nodes == 1) { - // We can switch to using the more compact single-node - // representation since the table only contains one live node - trie_t *node = 0; - // Since we always compact the table the pruned node must - // either be the left-most or right-most ptr in the node - // table - if (c == min) { - // The pruned node is the left-most node ptr in the - // node table => keep the right-most node - node = next.table [count - 1]; - min += count - 1; - } - else if (c == min + count - 1) { - // The pruned node is the right-most node ptr in the - // node table => keep the left-most node - node = next.table [0]; - } - - xs_assert (node); - free (next.table); - next.node = node; - count = 1; - } - else if (c == min) { - // We can compact the table "from the left". - // Find the left-most non-null node ptr, which we'll use as - // our new min - unsigned char new_min = min; - for (unsigned short i = 1; i < count; ++i) { - if (next.table [i]) { - new_min = i + min; - break; - } - } - xs_assert (new_min != min); - - trie_t **old_table = next.table; - xs_assert (new_min > min); - xs_assert (count > new_min - min); - - count = count - (new_min - min); - next.table = (trie_t**) malloc (sizeof (trie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (trie_t*) * count); - free (old_table); - - min = new_min; - } - else if (c == min + count - 1) { - // We can compact the table "from the right". - // Find the right-most non-null node ptr, which we'll use to - // determine the new table size - unsigned short new_count = count; - for (unsigned short i = 1; i < count; ++i) { - if (next.table [count - 1 - i]) { - new_count = count - i; - break; - } - } - xs_assert (new_count != count); - count = new_count; - - trie_t **old_table = next.table; - next.table = (trie_t**) malloc (sizeof (trie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table, sizeof (trie_t*) * count); - free (old_table); - } - } - } - - return ret; -} - -bool xs::trie_t::check (unsigned char *data_, size_t size_) -{ - // This function is on critical path. It deliberately doesn't use - // recursion to get a bit better performance. - trie_t *current = this; - while (true) { - - // We've found a corresponding subscription! - if (current->refcnt) - return true; - - // We've checked all the data and haven't found matching subscription. - if (!size_) - return false; - - // If there's no corresponding slot for the first character - // of the prefix, the message does not match. - unsigned char c = *data_; - if (c < current->min || c >= current->min + current->count) - return false; - - // Move to the next character. - if (current->count == 1) - current = current->next.node; - else { - current = current->next.table [c - current->min]; - if (!current) - return false; - } - data_++; - size_--; - } -} - -void xs::trie_t::apply (void (*func_) (unsigned char *data_, size_t size_, - void *arg_), void *arg_) -{ - unsigned char *buff = NULL; - apply_helper (&buff, 0, 0, func_, arg_); - free (buff); -} - -void xs::trie_t::apply_helper ( - unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), void *arg_) -{ - // If this node is a subscription, apply the function. - if (refcnt) - func_ (*buff_, buffsize_, arg_); - - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); - xs_assert (*buff_); - } - - // If there are no subnodes in the trie, return. - if (count == 0) - return; - - // If there's one subnode (optimisation). - if (count == 1) { - (*buff_) [buffsize_] = min; - buffsize_++; - next.node->apply_helper (buff_, buffsize_, maxbuffsize_, func_, arg_); - return; - } - - // If there are multiple subnodes. - for (unsigned short c = 0; c != count; c++) { - (*buff_) [buffsize_] = min + c; - if (next.table [c]) - next.table [c]->apply_helper (buff_, buffsize_ + 1, maxbuffsize_, - func_, arg_); - } -} - -bool xs::trie_t::is_redundant () const -{ - return refcnt == 0 && live_nodes == 0; -} - diff --git a/src/trie.hpp b/src/trie.hpp deleted file mode 100644 index 3e65a1a..0000000 --- a/src/trie.hpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - Copyright (c) 2009-2012 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation - Copyright (c) 2011-2012 Spotify AB - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __XS_TRIE_HPP_INCLUDED__ -#define __XS_TRIE_HPP_INCLUDED__ - -#include - -#include "stdint.hpp" - -namespace xs -{ - - class trie_t - { - public: - - trie_t (); - ~trie_t (); - - // Add key to the trie. Returns true if this is a new item in the trie - // rather than a duplicate. - bool add (unsigned char *prefix_, size_t size_); - - // Remove key from the trie. Returns true if the item is actually - // removed from the trie. - bool rm (unsigned char *prefix_, size_t size_); - - // Check whether particular key is in the trie. - bool check (unsigned char *data_, size_t size_); - - // Apply the function supplied to each subscription in the trie. - void apply (void (*func_) (unsigned char *data_, size_t size_, - void *arg_), void *arg_); - - private: - - void apply_helper ( - unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); - bool is_redundant () const; - - uint32_t refcnt; - unsigned char min; - unsigned short count; - unsigned short live_nodes; - union { - class trie_t *node; - class trie_t **table; - } next; - - trie_t (const trie_t&); - const trie_t &operator = (const trie_t&); - }; - -} - -#endif - diff --git a/src/wire.hpp b/src/wire.hpp index 6d1e11b..f840fce 100644 --- a/src/wire.hpp +++ b/src/wire.hpp @@ -23,6 +23,10 @@ #include "stdint.hpp" +// Protocol-related constants. +#define XS_CMD_SUBSCRIBE 1 +#define XS_CMD_UNSUBSCRIBE 2 + namespace xs { diff --git a/src/xpub.cpp b/src/xpub.cpp index 255c063..fe0b9a7 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -21,20 +21,27 @@ #include +#include "../include/xs.h" + #include "xpub.hpp" #include "pipe.hpp" +#include "wire.hpp" #include "err.hpp" #include "msg.hpp" xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), - more (false) + more (false), + tmp_filter_id (-1) { options.type = XS_XPUB; } xs::xpub_t::~xpub_t () { + // Deallocate all the filters. + for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) + it->type->pf_destroy ((void*) (core_t*) this, it->instance); } void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) @@ -46,8 +53,27 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // to all data on this pipe, implicitly. Also, if we are using // 0MQ/2.1-style protocol, there's no subscription forwarding. Thus, // we need to subscribe for all messages automatically. - if (icanhasall_ || pipe_->get_protocol () == 1) - subscriptions.add (NULL, 0, pipe_); + if (icanhasall_|| pipe_->get_protocol () == 1) { + + // Find the prefix filter. + // TODO: Change this to ALL filter. + filters_t::iterator it; + for (it = filters.begin (); it != filters.end (); ++it) + if (it->type->id (NULL) == XS_FILTER_PREFIX) + break; + if (it == filters.end ()) { + filter_t f; + f.type = get_filter (XS_FILTER_PREFIX); + xs_assert (f.type); + f.instance = f.type->pf_create ((void*) (core_t*) this); + xs_assert (f.instance); + filters.push_back (f); + it = filters.end () - 1; + } + + it->type->pf_subscribe ((void*) (core_t*) this, it->instance, pipe_, + NULL, 0); + } // The pipe is active when attached. Let's read the subscriptions from // it, if any. @@ -73,16 +99,72 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_) // TODO: In the case of malformed subscription we will simply ignore // it for now. However, we should close the connection instead. - if (size <= 0 || (*data == 0 && *data == 1)) { + if (size < 4) { + sub.close (); + return; + } + +#if 0 + // TODO: This is 0MQ/3.1 protocol. + if (size < 1) { sub.close (); return; } +#endif + + int cmd = get_uint16 (data); + int filter_id = get_uint16 (data + 2); + +#if 0 + // TODO: This is 0MQ/3.1 protocol. + int cmd = data [0] ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE; + int filter_id = XS_FILTER_PREFIX; +#endif + + if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { + sub.close (); + return; + } + + // Find the relevant filter. + filters_t::iterator it; + for (it = filters.begin (); it != filters.end (); ++it) + if (it->type->id (NULL) == filter_id) + break; bool unique; - if (*data == 0) - unique = subscriptions.rm (data + 1, size - 1, pipe_); - else - unique = subscriptions.add (data + 1, size - 1, pipe_); + if (cmd == XS_CMD_UNSUBSCRIBE) { + xs_assert (it != filters.end ()); + unique = it->type->pf_unsubscribe ((void*) (core_t*) this, + it->instance, pipe_, data + 4, size - 4) ? true : false; +#if 0 + // TODO: This is 0MQ/3.1 protocol. + unique = it->type->pf_unsubscribe ((void*) (core_t*) this, + it->instance, pipe_, data + 1, size - 1) ? true : false; +#endif + } + else { + + // If the filter of the specified type does not exist yet, + // create it. + if (it == filters.end ()) { + filter_t f; + f.type = get_filter (filter_id); + xs_assert (f.type); + f.instance = f.type->pf_create ((void*) (core_t*) this); + xs_assert (f.instance); + filters.push_back (f); + it = filters.end () - 1; + } + + unique = it->type->pf_subscribe ((void*) (core_t*) this, + it->instance, pipe_, data + 4, size - 4) ? true : false; +#if 0 + // TODO: This is 0MQ/3.1 protocol. + unique = it->type->pf_subscribe ((void*) (core_t*) this, + it->instance, pipe_, data + 1, size - 1) ? true : false; +#endif + } // If the subscription is not a duplicate store it so that it can be // passed to used on next recv call. @@ -100,28 +182,29 @@ void xs::xpub_t::xwrite_activated (pipe_t *pipe_) void xs::xpub_t::xterminated (pipe_t *pipe_) { - // Remove the pipe from the trie. If there are topics that nobody - // is interested in anymore, send corresponding unsubscriptions - // upstream. - subscriptions.rm (pipe_, send_unsubscription, this); + // Remove the pipe from all the filters. + for (filters_t::iterator it = filters.begin (); it != filters.end (); + ++it) { + tmp_filter_id = it->type->id (NULL); + it->type->pf_unsubscribe_all ((void*) (core_t*) this, it->instance, + (void*) pipe_); + tmp_filter_id = -1; + } dist.terminated (pipe_); } -void xs::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) -{ - xpub_t *self = (xpub_t*) arg_; - self->dist.match (pipe_); -} - int xs::xpub_t::xsend (msg_t *msg_, int flags_) { bool msg_more = msg_->flags () & msg_t::more ? true : false; // For the first part of multi-part message, find the matching pipes. - if (!more) - subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), - mark_as_matching, this); + if (!more) { + for (filters_t::iterator it = filters.begin (); it != filters.end (); + ++it) + it->type->pf_match ((void*) (core_t*) this, it->instance, + (unsigned char*) msg_->data (), msg_->size ()); + } // Send the message to all the pipes that were marked as matching // in the previous step. @@ -167,21 +250,35 @@ bool xs::xpub_t::xhas_in () return !pending.empty (); } -void xs::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, - void *arg_) +int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_) { - xpub_t *self = (xpub_t*) arg_; - - if (self->options.type != XS_PUB) { + // In XS_PUB socket, the subscriptions are not passed upstream. + if (options.type != XS_PUB) { // Place the unsubscription to the queue of pending (un)sunscriptions // to be retrived by the user later on. - xpub_t *self = (xpub_t*) arg_; + blob_t unsub (size_ + 4, 0); + put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE); + put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id); + memcpy ((void*) (unsub.data () + 4), data_, size_); + +#if 0 + // TODO: This is 0MQ/3.1 protocol. blob_t unsub (size_ + 1, 0); unsub [0] = 0; memcpy ((void*) (unsub.data () + 1), data_, size_); - self->pending.push_back (unsub); +#endif + + pending.push_back (unsub); } + + return 0; +} + +int xs::xpub_t::filter_matching (void *subscriber_) +{ + dist.match ((xs::pipe_t*) subscriber_); + return 0; } xs::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_, diff --git a/src/xpub.hpp b/src/xpub.hpp index 7f29dcc..c0e47ff 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -21,15 +21,17 @@ #ifndef __XS_XPUB_HPP_INCLUDED__ #define __XS_XPUB_HPP_INCLUDED__ +#include "../include/xs.h" + #include #include #include "socket_base.hpp" #include "session_base.hpp" -#include "mtrie.hpp" #include "array.hpp" #include "dist.hpp" #include "blob.hpp" +#include "core.hpp" namespace xs { @@ -39,8 +41,7 @@ namespace xs class pipe_t; class io_thread_t; - class xpub_t : - public socket_base_t + class xpub_t : public socket_base_t, public core_t { public: @@ -59,16 +60,18 @@ namespace xs private: - // Function to be applied to the trie to send all the subsciptions - // upstream. - static void send_unsubscription (unsigned char *data_, size_t size_, - void *arg_); - - // Function to be applied to each matching pipes. - static void mark_as_matching (xs::pipe_t *pipe_, void *arg_); + // Overloaded functions from core_t. + int filter_unsubscribed (const unsigned char *data_, size_t size_); + int filter_matching (void *subscriber_); - // List of all subscriptions mapped to corresponding pipes. - mtrie_t subscriptions; + // The repository of subscriptions. + struct filter_t + { + xs_filter_t *type; + void *instance; + }; + typedef std::vector filters_t; + filters_t filters; // Distributor of messages holding the list of outbound pipes. dist_t dist; @@ -81,6 +84,9 @@ namespace xs typedef std::deque pending_t; pending_t pending; + // Different values stored while filter extensions are being executed. + int tmp_filter_id; + xpub_t (const xpub_t&); const xpub_t &operator = (const xpub_t&); }; diff --git a/src/xsub.cpp b/src/xsub.cpp index af6789f..da56586 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -23,11 +23,14 @@ #include "xsub.hpp" #include "err.hpp" +#include "wire.hpp" xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), has_message (false), - more (false) + more (false), + tmp_pipe (NULL), + tmp_filter_id (-1) { options.type = XS_XSUB; @@ -44,6 +47,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : xs::xsub_t::~xsub_t () { + // Deallocate all the filters. + for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) + it->type->sf_destroy ((void*) (core_t*) this, it->instance); + int rc = message.close (); errno_assert (rc == 0); } @@ -59,8 +66,15 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) dist.attach (pipe_); // Send all the cached subscriptions to the new upstream peer. - subscriptions.apply (send_subscription, pipe_); + tmp_pipe = pipe_; + for (filters_t::iterator it = filters.begin (); it != filters.end (); + ++it) { + tmp_filter_id = it->type->id (NULL); + it->type->sf_enumerate ((void*) (core_t*) this, it->instance); + tmp_filter_id = -1; + } pipe_->flush (); + tmp_pipe = NULL; } void xs::xsub_t::xread_activated (pipe_t *pipe_) @@ -82,11 +96,17 @@ void xs::xsub_t::xterminated (pipe_t *pipe_) void xs::xsub_t::xhiccuped (pipe_t *pipe_) { + // Send all the cached subscriptions to the hiccuped pipe. if (pipe_->get_protocol () != 1) { - - // Send all the cached subscriptions to the hiccuped pipe. - subscriptions.apply (send_subscription, pipe_); + tmp_pipe = pipe_; + for (filters_t::iterator it = filters.begin (); it != filters.end (); + ++it) { + tmp_filter_id = it->type->id (NULL); + it->type->sf_enumerate ((void*) (core_t*) this, it->instance); + tmp_filter_id = -1; + } pipe_->flush (); + tmp_pipe = NULL; } } @@ -95,21 +115,69 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) size_t size = msg_->size (); unsigned char *data = (unsigned char*) msg_->data (); - // Malformed subscriptions. - if (size < 1 || (*data != 0 && *data != 1)) { + if (size < 4) { + errno = EINVAL; + return -1; + } + int cmd = get_uint16 (data); + int filter_id = get_uint16 (data + 2); + +#if 0 + // TODO: This is 0MQ/3.1 protocol. + if (size < 1) { + errno = EINVAL; + return -1; + } + int cmd = data [0] ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE; + int filter_id = XS_FILTER_PREFIX; +#endif + + if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { errno = EINVAL; return -1; } + + // Find the relevant filter. + filters_t::iterator it; + for (it = filters.begin (); it != filters.end (); ++it) + if (it->type->id (NULL) == filter_id) + break; + + // Process the subscription. + if (cmd == XS_CMD_SUBSCRIBE) { + + // If the filter of the specified type does not exist yet, create it. + if (it == filters.end ()) { + filter_t f; + f.type = get_filter (filter_id); + xs_assert (f.type); + f.instance = f.type->sf_create ((void*) (core_t*) this); + xs_assert (f.instance); + filters.push_back (f); + it = filters.end () - 1; + } - // Process the subscription. - if (*data == 1) { - if (subscriptions.add (data + 1, size - 1)) + if (it->type->sf_subscribe ((void*) (core_t*) this, + it->instance, data + 4, size - 4) == 1) +#if 0 + // TODO: This is 0MQ/3.1 protocol. + if (it->type->sf_subscribe ((void*) (core_t*) this, + it->instance, data + 1, size - 1) == 1) +#endif return dist.send_to_all (msg_, flags_); else return 0; } - else if (*data == 0) { - if (subscriptions.rm (data + 1, size - 1)) + else if (cmd == XS_CMD_UNSUBSCRIBE) { + xs_assert (it != filters.end ()); + + if (it->type->sf_unsubscribe ((void*) (core_t*) this, + it->instance, data + 4, size - 4) == 1) +#if 0 + // TODO: This is 0MQ/3.1 protocol. + if (it->type->sf_unsubscribe ((void*) (core_t*) this, + it->instance, data + 1, size - 1) == 1) +#endif return dist.send_to_all (msg_, flags_); else return 0; @@ -208,24 +276,37 @@ bool xs::xsub_t::xhas_in () bool xs::xsub_t::match (msg_t *msg_) { - return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); + for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) + if (it->type->sf_match ((void*) (core_t*) this, it->instance, + (unsigned char*) msg_->data (), msg_->size ())) + return true; + return false; } -void xs::xsub_t::send_subscription (unsigned char *data_, size_t size_, - void *arg_) +int xs::xsub_t::filter_subscribed (const unsigned char *data_, size_t size_) { - pipe_t *pipe = (pipe_t*) arg_; - // Create the subsctription message. msg_t msg; + int rc = msg.init_size (size_ + 4); + xs_assert (rc == 0); + unsigned char *data = (unsigned char*) msg.data (); + put_uint16 (data, XS_CMD_SUBSCRIBE); + put_uint16 (data + 2, tmp_filter_id); + memcpy (data + 4, data_, size_); + +#if 0 + // TODO: This is 0MQ/3.1 protocol. + xs_assert (tmp_filter_id == XS_FILTER_PREFIX); + msg_t msg; int rc = msg.init_size (size_ + 1); xs_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); data [0] = 1; memcpy (data + 1, data_, size_); +#endif // Send it to the pipe. - bool sent = pipe->write (&msg); + bool sent = tmp_pipe->write (&msg); // If we reached the SNDHWM, and thus cannot send the subscription, drop // the subscription message instead. This matches the behaviour of @@ -233,6 +314,8 @@ void xs::xsub_t::send_subscription (unsigned char *data_, size_t size_, // when the SNDHWM is reached. if (!sent) msg.close (); + + return 0; } xs::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_, diff --git a/src/xsub.hpp b/src/xsub.hpp index 07fe026..4621570 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -21,11 +21,15 @@ #ifndef __XS_XSUB_HPP_INCLUDED__ #define __XS_XSUB_HPP_INCLUDED__ +#include + +#include "../include/xs.h" + #include "socket_base.hpp" #include "session_base.hpp" #include "dist.hpp" +#include "core.hpp" #include "fq.hpp" -#include "trie.hpp" namespace xs { @@ -34,8 +38,7 @@ namespace xs class pipe_t; class io_thread_t; - class xsub_t : - public socket_base_t + class xsub_t : public socket_base_t, public core_t { public: @@ -57,14 +60,12 @@ namespace xs private: + // Overloads from core_t class. + int filter_subscribed (const unsigned char *data_, size_t size_); + // Check whether the message matches at least one subscription. bool match (xs::msg_t *msg_); - // Function to be applied to the trie to send all the subsciptions - // upstream. - static void send_subscription (unsigned char *data_, size_t size_, - void *arg_); - // Fair queueing object for inbound pipes. fq_t fq; @@ -72,7 +73,13 @@ namespace xs dist_t dist; // The repository of subscriptions. - trie_t subscriptions; + struct filter_t + { + xs_filter_t *type; + void *instance; + }; + typedef std::vector filters_t; + filters_t filters; // If true, 'message' contains a matching message to return on the // next recv call. @@ -83,6 +90,10 @@ namespace xs // there are following parts still waiting. bool more; + // Different values stored while filter extensions are being executed. + pipe_t *tmp_pipe; + int tmp_filter_id; + xsub_t (const xsub_t&); const xsub_t &operator = (const xsub_t&); }; -- cgit v1.2.3