summaryrefslogtreecommitdiff
path: root/src/ctx.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-05 07:32:58 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-05 07:32:58 +0200
commitd82cbb3a81f116cd22e9895ecac36ac3d7b38929 (patch)
tree03c923311b937f550bec325d131476513a02bebf /src/ctx.cpp
parent52b8a917deb2990e7197b82e81e0258ebe30f424 (diff)
XS_PLUGIN and XS_FILTER implementation
This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/ctx.cpp')
-rw-r--r--src/ctx.cpp39
1 files changed, 39 insertions, 0 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 21dec58..46fa984 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -35,6 +35,7 @@
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
+#include "prefix_filter.hpp"
xs::ctx_t::ctx_t () :
tag (0xbadcafe0),
@@ -46,6 +47,9 @@ xs::ctx_t::ctx_t () :
max_sockets (512),
io_thread_count (1)
{
+ // Plug in the standard plugins.
+ int rc = plug (prefix_filter);
+ xs_assert (rc == 0);
}
bool xs::ctx_t::check_tag ()
@@ -124,6 +128,28 @@ int xs::ctx_t::terminate ()
return 0;
}
+int xs::ctx_t::plug (const void *ext_)
+{
+ if (!ext_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ // The extension is a message filter plug-in.
+ xs_filter_t *filter = (xs_filter_t*) ext_;
+ if (filter->type == XS_PLUGIN_FILTER && filter->version == 1) {
+ opt_sync.lock ();
+ filters [filter->id (NULL)] = filter;
+ opt_sync.unlock ();
+ return 0;
+ }
+
+ // Specified extension type is not supported by this version of
+ // the library.
+ errno = ENOTSUP;
+ return -1;
+}
+
int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
{
switch (option_) {
@@ -145,6 +171,8 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
io_thread_count = *((int*) optval_);
opt_sync.unlock ();
break;
+ case XS_PLUGIN:
+ return plug (optval_);
default:
errno = EINVAL;
return -1;
@@ -256,6 +284,17 @@ xs::object_t *xs::ctx_t::get_reaper ()
return reaper;
}
+xs_filter_t *xs::ctx_t::get_filter (int filter_id_)
+{
+ xs_filter_t *result = NULL;
+ opt_sync.lock ();
+ filters_t::iterator it = filters.find (filter_id_);
+ if (it != filters.end ())
+ result = it->second;
+ opt_sync.unlock ();
+ return result;
+}
+
void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
slots [tid_]->send (command_);