diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2012-04-05 07:32:58 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2012-04-05 07:32:58 +0200 |
commit | d82cbb3a81f116cd22e9895ecac36ac3d7b38929 (patch) | |
tree | 03c923311b937f550bec325d131476513a02bebf /src/xpub.hpp | |
parent | 52b8a917deb2990e7197b82e81e0258ebe30f424 (diff) |
XS_PLUGIN and XS_FILTER implementation
This patch introduces following features:
- XS_PLUGIN context option to add plugins to libxs
- XS_FILTER option to switch between different filter types
- Automatic loading of plug-ins is *not* implemented.
From the implementation point of view:
- standard prefix filter is implemented as a pluggable filter
- trie_t and mtrie_t are joined into a single class
- the code for 0MQ/3.1 compatibility is left in in the form of comments
- new test for testing re-subscriptions is added
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xpub.hpp')
-rw-r--r-- | src/xpub.hpp | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/src/xpub.hpp b/src/xpub.hpp index 7f29dcc..c0e47ff 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -21,15 +21,17 @@ #ifndef __XS_XPUB_HPP_INCLUDED__ #define __XS_XPUB_HPP_INCLUDED__ +#include "../include/xs.h" + #include <deque> #include <string> #include "socket_base.hpp" #include "session_base.hpp" -#include "mtrie.hpp" #include "array.hpp" #include "dist.hpp" #include "blob.hpp" +#include "core.hpp" namespace xs { @@ -39,8 +41,7 @@ namespace xs class pipe_t; class io_thread_t; - class xpub_t : - public socket_base_t + class xpub_t : public socket_base_t, public core_t { public: @@ -59,16 +60,18 @@ namespace xs private: - // Function to be applied to the trie to send all the subsciptions - // upstream. - static void send_unsubscription (unsigned char *data_, size_t size_, - void *arg_); - - // Function to be applied to each matching pipes. - static void mark_as_matching (xs::pipe_t *pipe_, void *arg_); + // Overloaded functions from core_t. + int filter_unsubscribed (const unsigned char *data_, size_t size_); + int filter_matching (void *subscriber_); - // List of all subscriptions mapped to corresponding pipes. - mtrie_t subscriptions; + // The repository of subscriptions. + struct filter_t + { + xs_filter_t *type; + void *instance; + }; + typedef std::vector <filter_t> filters_t; + filters_t filters; // Distributor of messages holding the list of outbound pipes. dist_t dist; @@ -81,6 +84,9 @@ namespace xs typedef std::deque <blob_t> pending_t; pending_t pending; + // Different values stored while filter extensions are being executed. + int tmp_filter_id; + xpub_t (const xpub_t&); const xpub_t &operator = (const xpub_t&); }; |