From d82cbb3a81f116cd22e9895ecac36ac3d7b38929 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 5 Apr 2012 07:32:58 +0200 Subject: XS_PLUGIN and XS_FILTER implementation This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik --- src/ctx.cpp | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) (limited to 'src/ctx.cpp') diff --git a/src/ctx.cpp b/src/ctx.cpp index 21dec58..46fa984 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" +#include "prefix_filter.hpp" xs::ctx_t::ctx_t () : tag (0xbadcafe0), @@ -46,6 +47,9 @@ xs::ctx_t::ctx_t () : max_sockets (512), io_thread_count (1) { + // Plug in the standard plugins. + int rc = plug (prefix_filter); + xs_assert (rc == 0); } bool xs::ctx_t::check_tag () @@ -124,6 +128,28 @@ int xs::ctx_t::terminate () return 0; } +int xs::ctx_t::plug (const void *ext_) +{ + if (!ext_) { + errno = EFAULT; + return -1; + } + + // The extension is a message filter plug-in. + xs_filter_t *filter = (xs_filter_t*) ext_; + if (filter->type == XS_PLUGIN_FILTER && filter->version == 1) { + opt_sync.lock (); + filters [filter->id (NULL)] = filter; + opt_sync.unlock (); + return 0; + } + + // Specified extension type is not supported by this version of + // the library. + errno = ENOTSUP; + return -1; +} + int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) { switch (option_) { @@ -145,6 +171,8 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) io_thread_count = *((int*) optval_); opt_sync.unlock (); break; + case XS_PLUGIN: + return plug (optval_); default: errno = EINVAL; return -1; @@ -256,6 +284,17 @@ xs::object_t *xs::ctx_t::get_reaper () return reaper; } +xs_filter_t *xs::ctx_t::get_filter (int filter_id_) +{ + xs_filter_t *result = NULL; + opt_sync.lock (); + filters_t::iterator it = filters.find (filter_id_); + if (it != filters.end ()) + result = it->second; + opt_sync.unlock (); + return result; +} + void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); -- cgit v1.2.3