diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 8 | ||||
-rw-r--r-- | src/core.cpp | 68 | ||||
-rw-r--r-- | src/core.hpp | 53 | ||||
-rw-r--r-- | src/ctx.cpp | 39 | ||||
-rw-r--r-- | src/ctx.hpp | 14 | ||||
-rw-r--r-- | src/mtrie.cpp | 436 | ||||
-rw-r--r-- | src/mtrie.hpp | 93 | ||||
-rw-r--r-- | src/object.cpp | 5 | ||||
-rw-r--r-- | src/object.hpp | 5 | ||||
-rw-r--r-- | src/options.cpp | 20 | ||||
-rw-r--r-- | src/options.hpp | 3 | ||||
-rw-r--r-- | src/prefix_filter.cpp | 664 | ||||
-rw-r--r-- | src/prefix_filter.hpp | 108 | ||||
-rw-r--r-- | src/sub.cpp | 19 | ||||
-rw-r--r-- | src/trie.cpp | 338 | ||||
-rw-r--r-- | src/trie.hpp | 79 | ||||
-rw-r--r-- | src/wire.hpp | 4 | ||||
-rw-r--r-- | src/xpub.cpp | 153 | ||||
-rw-r--r-- | src/xpub.hpp | 30 | ||||
-rw-r--r-- | src/xsub.cpp | 119 | ||||
-rw-r--r-- | src/xsub.hpp | 29 |
21 files changed, 1267 insertions, 1020 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index fdfa8d1..3c260be 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,6 +23,7 @@ libxs_la_SOURCES = \ clock.hpp \ command.hpp \ config.hpp \ + core.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ @@ -44,7 +45,6 @@ libxs_la_SOURCES = \ likely.hpp \ mailbox.hpp \ msg.hpp \ - mtrie.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -56,6 +56,7 @@ libxs_la_SOURCES = \ platform.hpp \ poll.hpp \ pair.hpp \ + prefix_filter.hpp \ pub.hpp \ pull.hpp \ push.hpp \ @@ -74,7 +75,6 @@ libxs_la_SOURCES = \ tcp_connecter.hpp \ tcp_listener.hpp \ thread.hpp \ - trie.hpp \ upoll.hpp \ windows.hpp \ wire.hpp \ @@ -85,6 +85,7 @@ libxs_la_SOURCES = \ ypipe.hpp \ yqueue.hpp \ clock.cpp \ + core.cpp \ ctx.cpp \ decoder.cpp \ devpoll.cpp \ @@ -103,7 +104,6 @@ libxs_la_SOURCES = \ lb.cpp \ mailbox.cpp \ msg.cpp \ - mtrie.cpp \ object.cpp \ options.cpp \ own.cpp \ @@ -113,6 +113,7 @@ libxs_la_SOURCES = \ pgm_socket.cpp \ pipe.cpp \ poll.cpp \ + prefix_filter.cpp \ pull.cpp \ push.cpp \ reaper.cpp \ @@ -130,7 +131,6 @@ libxs_la_SOURCES = \ tcp_connecter.cpp \ tcp_listener.cpp \ thread.cpp \ - trie.cpp \ upoll.cpp \ xpub.cpp \ xrep.cpp \ diff --git a/src/core.cpp b/src/core.cpp new file mode 100644 index 0000000..2d16167 --- /dev/null +++ b/src/core.cpp @@ -0,0 +1,68 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../include/xs.h" + +#include "core.hpp" +#include "err.hpp" + +xs::core_t::core_t () +{ +} + +xs::core_t::~core_t () +{ +} + +int xs::core_t::filter_subscribed (const unsigned char *data_, size_t size_) +{ + errno = ENOTSUP; + return -1; +} + +int xs::core_t::filter_unsubscribed (const unsigned char *data_, size_t size_) +{ + errno = ENOTSUP; + return -1; +} + +int xs::core_t::filter_matching (void *subscriber_) +{ + errno = ENOTSUP; + return -1; +} + +int xs_filter_subscribed (void *core_, + const unsigned char *data_, size_t size_) +{ + return ((xs::core_t*) core_)->filter_subscribed (data_, size_); +} + +int xs_filter_unsubscribed (void *core_, + const unsigned char *data_, size_t size_) +{ + return ((xs::core_t*) core_)->filter_unsubscribed (data_, size_); +} + +int xs_filter_matching (void *core_, void *subscriber_) +{ + return ((xs::core_t*) core_)->filter_matching (subscriber_); +} + diff --git a/src/core.hpp b/src/core.hpp new file mode 100644 index 0000000..fb50646 --- /dev/null +++ b/src/core.hpp @@ -0,0 +1,53 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_CORE_HPP_INCLUDED__ +#define __XS_CORE_HPP_INCLUDED__ + +#include <stddef.h> + +namespace xs +{ + + // This class is not a core of Crossroads. It's rather a callback interface + // for extensions, ie. what's extensions see as Crossroads core. + + class core_t + { + public: + + core_t (); + virtual ~core_t (); + + virtual int filter_subscribed (const unsigned char *data_, + size_t size_); + virtual int filter_unsubscribed (const unsigned char *data_, + size_t size_); + virtual int filter_matching (void *subscriber_); + + private: + + core_t (const core_t&); + const core_t &operator = (const core_t&); + }; + +} + +#endif diff --git a/src/ctx.cpp b/src/ctx.cpp index 21dec58..46fa984 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" +#include "prefix_filter.hpp" xs::ctx_t::ctx_t () : tag (0xbadcafe0), @@ -46,6 +47,9 @@ xs::ctx_t::ctx_t () : max_sockets (512), io_thread_count (1) { + // Plug in the standard plugins. + int rc = plug (prefix_filter); + xs_assert (rc == 0); } bool xs::ctx_t::check_tag () @@ -124,6 +128,28 @@ int xs::ctx_t::terminate () return 0; } +int xs::ctx_t::plug (const void *ext_) +{ + if (!ext_) { + errno = EFAULT; + return -1; + } + + // The extension is a message filter plug-in. + xs_filter_t *filter = (xs_filter_t*) ext_; + if (filter->type == XS_PLUGIN_FILTER && filter->version == 1) { + opt_sync.lock (); + filters [filter->id (NULL)] = filter; + opt_sync.unlock (); + return 0; + } + + // Specified extension type is not supported by this version of + // the library. + errno = ENOTSUP; + return -1; +} + int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) { switch (option_) { @@ -145,6 +171,8 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) io_thread_count = *((int*) optval_); opt_sync.unlock (); break; + case XS_PLUGIN: + return plug (optval_); default: errno = EINVAL; return -1; @@ -256,6 +284,17 @@ xs::object_t *xs::ctx_t::get_reaper () return reaper; } +xs_filter_t *xs::ctx_t::get_filter (int filter_id_) +{ + xs_filter_t *result = NULL; + opt_sync.lock (); + filters_t::iterator it = filters.find (filter_id_); + if (it != filters.end ()) + result = it->second; + opt_sync.unlock (); + return result; +} + void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); diff --git a/src/ctx.hpp b/src/ctx.hpp index c4fa96a..54144ad 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -25,7 +25,8 @@ #include <map> #include <vector> #include <string> -#include <stdarg.h> + +#include "../include/xs.h" #include "mailbox.hpp" #include "array.hpp" @@ -88,6 +89,10 @@ namespace xs // Returns reaper thread object. xs::object_t *get_reaper (); + // Get the filter associated with the specified filter ID or NULL + // If such filter is not registered. + xs_filter_t *get_filter (int filter_id_); + // Management of inproc endpoints. int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (xs::socket_base_t *socket_); @@ -102,6 +107,9 @@ namespace xs private: + // Plug in the extension specified. + int plug (const void *ext_); + // Used to check whether the object is a context. uint32_t tag; @@ -161,6 +169,10 @@ namespace xs // Synchronisation of access to context options. mutex_t opt_sync; + // List of all filters plugged into the context. + typedef std::map <int, xs_filter_t*> filters_t; + filters_t filters; + ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; diff --git a/src/mtrie.cpp b/src/mtrie.cpp deleted file mode 100644 index eae34c2..0000000 --- a/src/mtrie.cpp +++ /dev/null @@ -1,436 +0,0 @@ -/* - Copyright (c) 2011-2012 250bpm s.r.o. - Copyright (c) 2011-2012 Spotify AB - Copyright (c) 2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <stdlib.h> - -#include <new> -#include <algorithm> - -#include "platform.hpp" -#if defined XS_HAVE_WINDOWS -#include "windows.hpp" -#endif - -#include "err.hpp" -#include "pipe.hpp" -#include "mtrie.hpp" - -xs::mtrie_t::mtrie_t () : - pipes (0), - min (0), - count (0), - live_nodes (0) -{ -} - -xs::mtrie_t::~mtrie_t () -{ - if (pipes) { - delete pipes; - pipes = 0; - } - - if (count == 1) { - xs_assert (next.node); - delete next.node; - next.node = 0; - } - else if (count > 1) { - for (unsigned short i = 0; i != count; ++i) - if (next.table [i]) - delete next.table [i]; - free (next.table); - } -} - -bool xs::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return add_helper (prefix_, size_, pipe_); -} - -bool xs::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - // We are at the node corresponding to the prefix. We are done. - if (!size_) { - bool result = !pipes; - if (!pipes) - pipes = new pipes_t; - pipes->insert (pipe_); - return result; - } - - unsigned char c = *prefix_; - if (c < min || c >= min + count) { - - // The character is out of range of currently handled - // charcters. We have to extend the table. - if (!count) { - min = c; - count = 1; - next.node = NULL; - } - else if (count == 1) { - unsigned char oldc = min; - mtrie_t *oldp = next.node; - count = (min < c ? c - min : min - c) + 1; - next.table = (mtrie_t**) - malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - for (unsigned short i = 0; i != count; ++i) - next.table [i] = 0; - min = std::min (min, c); - next.table [oldc - min] = oldp; - } - else if (min < c) { - - // The new character is above the current character range. - unsigned short old_count = count; - count = c - min + 1; - next.table = (mtrie_t**) realloc ((void*) next.table, - sizeof (mtrie_t*) * count); - xs_assert (next.table); - for (unsigned short i = old_count; i != count; i++) - next.table [i] = NULL; - } - else { - - // The new character is below the current character range. - unsigned short old_count = count; - count = (min + old_count) - c; - next.table = (mtrie_t**) realloc ((void*) next.table, - sizeof (mtrie_t*) * count); - xs_assert (next.table); - memmove (next.table + min - c, next.table, - old_count * sizeof (mtrie_t*)); - for (unsigned short i = 0; i != min - c; i++) - next.table [i] = NULL; - min = c; - } - } - - // If next node does not exist, create one. - if (count == 1) { - if (!next.node) { - next.node = new (std::nothrow) mtrie_t; - ++live_nodes; - xs_assert (next.node); - } - return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_); - } - else { - if (!next.table [c - min]) { - next.table [c - min] = new (std::nothrow) mtrie_t; - ++live_nodes; - xs_assert (next.table [c - min]); - } - return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_); - } -} - - -void xs::mtrie_t::rm (pipe_t *pipe_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) -{ - unsigned char *buff = NULL; - rm_helper (pipe_, &buff, 0, 0, func_, arg_); - free (buff); -} - -void xs::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) -{ - // Remove the subscription from this node. - if (pipes && pipes->erase (pipe_) && pipes->empty ()) { - func_ (*buff_, buffsize_, arg_); - delete pipes; - pipes = 0; - } - - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); - alloc_assert (*buff_); - } - - // If there are no subnodes in the trie, return. - if (count == 0) - return; - - // If there's one subnode (optimisation). - if (count == 1) { - (*buff_) [buffsize_] = min; - buffsize_++; - next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, - func_, arg_); - - // Prune the node if it was made redundant by the removal - if (next.node->is_redundant ()) { - delete next.node; - next.node = 0; - count = 0; - --live_nodes; - xs_assert (live_nodes == 0); - } - return; - } - - // If there are multiple subnodes. - // - // New min non-null character in the node table after the removal - unsigned char new_min = min + count - 1; - // New max non-null character in the node table after the removal - unsigned char new_max = min; - for (unsigned short c = 0; c != count; c++) { - (*buff_) [buffsize_] = min + c; - if (next.table [c]) { - next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, - maxbuffsize_, func_, arg_); - - // Prune redundant nodes from the mtrie - if (next.table [c]->is_redundant ()) { - delete next.table [c]; - next.table [c] = 0; - - xs_assert (live_nodes > 0); - --live_nodes; - } - else { - // The node is not redundant, so it's a candidate for being - // the new min/max node. - // - // We loop through the node array from left to right, so the - // first non-null, non-redundant node encountered is the new - // minimum index. Conversely, the last non-redundant, non-null - // node encountered is the new maximum index. - if (c + min < new_min) - new_min = c + min; - if (c + min > new_max) - new_max = c + min; - } - } - } - - xs_assert (count > 1); - - // Compact the node table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - xs_assert (new_min == new_max); - xs_assert (new_min >= min && new_min < min + count); - mtrie_t *node = next.table [new_min - min]; - xs_assert (node); - free (next.table); - next.node = node; - count = 1; - min = new_min; - } - else if (live_nodes > 1 && (new_min > min || new_max < min + count - 1)) { - xs_assert (new_max - new_min + 1 > 1); - - mtrie_t **old_table = next.table; - xs_assert (new_min > min || new_max < min + count - 1); - xs_assert (new_min >= min); - xs_assert (new_max <= min + count - 1); - xs_assert (new_max - new_min + 1 < count); - - count = new_max - new_min + 1; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } -} - -bool xs::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_) -{ - return rm_helper (prefix_, size_, pipe_); -} - -bool xs::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, - pipe_t *pipe_) -{ - if (!size_) { - if (pipes) { - pipes_t::size_type erased = pipes->erase (pipe_); - xs_assert (erased == 1); - if (pipes->empty ()) { - delete pipes; - pipes = 0; - } - } - return !pipes; - } - - unsigned char c = *prefix_; - if (!count || c < min || c >= min + count) - return false; - - mtrie_t *next_node = - count == 1 ? next.node : next.table [c - min]; - - if (!next_node) - return false; - - bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); - - if (next_node->is_redundant ()) { - delete next_node; - xs_assert (count > 0); - - if (count == 1) { - next.node = 0; - count = 0; - --live_nodes; - xs_assert (live_nodes == 0); - } - else { - next.table [c - min] = 0; - xs_assert (live_nodes > 1); - --live_nodes; - - // Compact the table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - mtrie_t *node = 0; - for (unsigned short i = 0; i < count; ++i) { - if (next.table [i]) { - node = next.table [i]; - min = i + min; - break; - } - } - - xs_assert (node); - free (next.table); - next.node = node; - count = 1; - } - else if (c == min) { - // We can compact the table "from the left" - unsigned char new_min = min; - for (unsigned short i = 1; i < count; ++i) { - if (next.table [i]) { - new_min = i + min; - break; - } - } - xs_assert (new_min != min); - - mtrie_t **old_table = next.table; - xs_assert (new_min > min); - xs_assert (count > new_min - min); - - count = count - (new_min - min); - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t*) * count); - free (old_table); - - min = new_min; - } - else if (c == min + count - 1) { - // We can compact the table "from the right" - unsigned short new_count = count; - for (unsigned short i = 1; i < count; ++i) { - if (next.table [count - 1 - i]) { - new_count = count - i; - break; - } - } - xs_assert (new_count != count); - count = new_count; - - mtrie_t **old_table = next.table; - next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count); - xs_assert (next.table); - - memmove (next.table, old_table, sizeof (mtrie_t*) * count); - free (old_table); - } - } - } - - return ret; -} - -void xs::mtrie_t::match (unsigned char *data_, size_t size_, - void (*func_) (pipe_t *pipe_, void *arg_), void *arg_) -{ - mtrie_t *current = this; - while (true) { - - // Signal the pipes attached to this node. - if (current->pipes) { - for (pipes_t::iterator it = current->pipes->begin (); - it != current->pipes->end (); ++it) - func_ (*it, arg_); - } - - // If we are at the end of the message, there's nothing more to match. - if (!size_) - break; - - // If there are no subnodes in the trie, return. - if (current->count == 0) - break; - - // If there's one subnode (optimisation). - if (current->count == 1) { - if (data_ [0] != current->min) - break; - current = current->next.node; - data_++; - size_--; - continue; - } - - // If there are multiple subnodes. - if (data_ [0] < current->min || data_ [0] >= - current->min + current->count) - break; - if (!current->next.table [data_ [0] - current->min]) - break; - current = current->next.table [data_ [0] - current->min]; - data_++; - size_--; - } -} - -bool xs::mtrie_t::is_redundant () const -{ - return !pipes && live_nodes == 0; -} - diff --git a/src/mtrie.hpp b/src/mtrie.hpp deleted file mode 100644 index 1e56b4e..0000000 --- a/src/mtrie.hpp +++ /dev/null @@ -1,93 +0,0 @@ -/* - Copyright (c) 2011-2012 250bpm s.r.o. - Copyright (c) 2011-2012 Spotify AB - Copyright (c) 2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __XS_MTRIE_HPP_INCLUDED__ -#define __XS_MTRIE_HPP_INCLUDED__ - -#include <stddef.h> -#include <set> - -#include "stdint.hpp" - -namespace xs -{ - - class pipe_t; - - // Multi-trie. Each node in the trie is a set of pointers to pipes. - - class mtrie_t - { - public: - - mtrie_t (); - ~mtrie_t (); - - // Add key to the trie. Returns true if it's a new subscription - // rather than a duplicate. - bool add (unsigned char *prefix_, size_t size_, xs::pipe_t *pipe_); - - // Remove all subscriptions for a specific peer from the trie. - // If there are no subscriptions left on some topics, invoke the - // supplied callback function. - void rm (xs::pipe_t *pipe_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); - - // Remove specific subscription from the trie. Return true is it was - // actually removed rather than de-duplicated. - bool rm (unsigned char *prefix_, size_t size_, xs::pipe_t *pipe_); - - // Signal all the matching pipes. - void match (unsigned char *data_, size_t size_, - void (*func_) (xs::pipe_t *pipe_, void *arg_), void *arg_); - - private: - - bool add_helper (unsigned char *prefix_, size_t size_, - xs::pipe_t *pipe_); - void rm_helper (xs::pipe_t *pipe_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, - void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); - bool rm_helper (unsigned char *prefix_, size_t size_, - xs::pipe_t *pipe_); - bool is_redundant () const; - - typedef std::set <xs::pipe_t*> pipes_t; - pipes_t *pipes; - - unsigned char min; - unsigned short count; - unsigned short live_nodes; - union { - class mtrie_t *node; - class mtrie_t **table; - } next; - - mtrie_t (const mtrie_t&); - const mtrie_t &operator = (const mtrie_t&); - }; - -} - -#endif - diff --git a/src/object.cpp b/src/object.cpp index 5c1ed84..4f04fe4 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -154,6 +154,11 @@ xs::io_thread_t *xs::object_t::choose_io_thread (uint64_t affinity_) return ctx->choose_io_thread (affinity_); } +xs_filter_t *xs::object_t::get_filter (int filter_id_) +{ + return ctx->get_filter (filter_id_); +} + void xs::object_t::send_stop () { // 'stop' command goes always from administrative thread to diff --git a/src/object.hpp b/src/object.hpp index 5b855a5..fabb156 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -22,6 +22,8 @@ #ifndef __XS_OBJECT_HPP_INCLUDED__ #define __XS_OBJECT_HPP_INCLUDED__ +#include "../include/xs.h" + #include "stdint.hpp" namespace xs @@ -64,6 +66,9 @@ namespace xs // Chooses least loaded I/O thread. xs::io_thread_t *choose_io_thread (uint64_t affinity_); + // Functions related to extensions. + xs_filter_t *get_filter (int filter_id_); + // Derived object can use these functions to send commands // to other objects. void send_stop (); diff --git a/src/options.cpp b/src/options.cpp index c9cbaae..fdbffde 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,6 +23,8 @@ #include <string.h> #include <limits> +#include "../include/xs.h" + #include "options.hpp" #include "err.hpp" @@ -47,6 +49,7 @@ xs::options_t::options_t () : ipv4only (1), keepalive (0), protocol (0), + filter_id (XS_FILTER_PREFIX), delay_on_close (true), delay_on_disconnect (true), filter (false), @@ -248,6 +251,14 @@ int xs::options_t::setsockopt (int option_, const void *optval_, return 0; } + case XS_FILTER: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + filter_id = *((int*) optval_); + return 0; + } errno = EINVAL; @@ -438,6 +449,15 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case XS_FILTER: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = filter_id; + *optvallen_ = sizeof (int); + return 0; + } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index c1e4dda..1288f72 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -95,6 +95,9 @@ namespace xs // Version of wire protocol to use. int protocol; + // Filter ID to be used with subscriptions and unsubscriptions. + int filter_id; + // If true, session reads all the pending messages from the pipe and // sends them to the network when socket is closed. bool delay_on_close; diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp new file mode 100644 index 0000000..5d21fb6 --- /dev/null +++ b/src/prefix_filter.cpp @@ -0,0 +1,664 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2011-2012 Spotify AB + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have receiv |