From d82cbb3a81f116cd22e9895ecac36ac3d7b38929 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 5 Apr 2012 07:32:58 +0200 Subject: XS_PLUGIN and XS_FILTER implementation This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik --- src/xpub.hpp | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) (limited to 'src/xpub.hpp') diff --git a/src/xpub.hpp b/src/xpub.hpp index 7f29dcc..c0e47ff 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -21,15 +21,17 @@ #ifndef __XS_XPUB_HPP_INCLUDED__ #define __XS_XPUB_HPP_INCLUDED__ +#include "../include/xs.h" + #include #include #include "socket_base.hpp" #include "session_base.hpp" -#include "mtrie.hpp" #include "array.hpp" #include "dist.hpp" #include "blob.hpp" +#include "core.hpp" namespace xs { @@ -39,8 +41,7 @@ namespace xs class pipe_t; class io_thread_t; - class xpub_t : - public socket_base_t + class xpub_t : public socket_base_t, public core_t { public: @@ -59,16 +60,18 @@ namespace xs private: - // Function to be applied to the trie to send all the subsciptions - // upstream. - static void send_unsubscription (unsigned char *data_, size_t size_, - void *arg_); - - // Function to be applied to each matching pipes. - static void mark_as_matching (xs::pipe_t *pipe_, void *arg_); + // Overloaded functions from core_t. + int filter_unsubscribed (const unsigned char *data_, size_t size_); + int filter_matching (void *subscriber_); - // List of all subscriptions mapped to corresponding pipes. - mtrie_t subscriptions; + // The repository of subscriptions. + struct filter_t + { + xs_filter_t *type; + void *instance; + }; + typedef std::vector filters_t; + filters_t filters; // Distributor of messages holding the list of outbound pipes. dist_t dist; @@ -81,6 +84,9 @@ namespace xs typedef std::deque pending_t; pending_t pending; + // Different values stored while filter extensions are being executed. + int tmp_filter_id; + xpub_t (const xpub_t&); const xpub_t &operator = (const xpub_t&); }; -- cgit v1.2.3