diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2012-04-05 07:32:58 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2012-04-05 07:32:58 +0200 |
commit | d82cbb3a81f116cd22e9895ecac36ac3d7b38929 (patch) | |
tree | 03c923311b937f550bec325d131476513a02bebf /src/ctx.cpp | |
parent | 52b8a917deb2990e7197b82e81e0258ebe30f424 (diff) |
XS_PLUGIN and XS_FILTER implementation
This patch introduces following features:
- XS_PLUGIN context option to add plugins to libxs
- XS_FILTER option to switch between different filter types
- Automatic loading of plug-ins is *not* implemented.
From the implementation point of view:
- standard prefix filter is implemented as a pluggable filter
- trie_t and mtrie_t are joined into a single class
- the code for 0MQ/3.1 compatibility is left in in the form of comments
- new test for testing re-subscriptions is added
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/ctx.cpp')
-rw-r--r-- | src/ctx.cpp | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp index 21dec58..46fa984 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" +#include "prefix_filter.hpp" xs::ctx_t::ctx_t () : tag (0xbadcafe0), @@ -46,6 +47,9 @@ xs::ctx_t::ctx_t () : max_sockets (512), io_thread_count (1) { + // Plug in the standard plugins. + int rc = plug (prefix_filter); + xs_assert (rc == 0); } bool xs::ctx_t::check_tag () @@ -124,6 +128,28 @@ int xs::ctx_t::terminate () return 0; } +int xs::ctx_t::plug (const void *ext_) +{ + if (!ext_) { + errno = EFAULT; + return -1; + } + + // The extension is a message filter plug-in. + xs_filter_t *filter = (xs_filter_t*) ext_; + if (filter->type == XS_PLUGIN_FILTER && filter->version == 1) { + opt_sync.lock (); + filters [filter->id (NULL)] = filter; + opt_sync.unlock (); + return 0; + } + + // Specified extension type is not supported by this version of + // the library. + errno = ENOTSUP; + return -1; +} + int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) { switch (option_) { @@ -145,6 +171,8 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) io_thread_count = *((int*) optval_); opt_sync.unlock (); break; + case XS_PLUGIN: + return plug (optval_); default: errno = EINVAL; return -1; @@ -256,6 +284,17 @@ xs::object_t *xs::ctx_t::get_reaper () return reaper; } +xs_filter_t *xs::ctx_t::get_filter (int filter_id_) +{ + xs_filter_t *result = NULL; + opt_sync.lock (); + filters_t::iterator it = filters.find (filter_id_); + if (it != filters.end ()) + result = it->second; + opt_sync.unlock (); + return result; +} + void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); |