summaryrefslogtreecommitdiff
path: root/src/xpub.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-05 07:32:58 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-05 07:32:58 +0200
commitd82cbb3a81f116cd22e9895ecac36ac3d7b38929 (patch)
tree03c923311b937f550bec325d131476513a02bebf /src/xpub.hpp
parent52b8a917deb2990e7197b82e81e0258ebe30f424 (diff)
XS_PLUGIN and XS_FILTER implementation
This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xpub.hpp')
-rw-r--r--src/xpub.hpp30
1 files changed, 18 insertions, 12 deletions
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 7f29dcc..c0e47ff 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -21,15 +21,17 @@
#ifndef __XS_XPUB_HPP_INCLUDED__
#define __XS_XPUB_HPP_INCLUDED__
+#include "../include/xs.h"
+
#include <deque>
#include <string>
#include "socket_base.hpp"
#include "session_base.hpp"
-#include "mtrie.hpp"
#include "array.hpp"
#include "dist.hpp"
#include "blob.hpp"
+#include "core.hpp"
namespace xs
{
@@ -39,8 +41,7 @@ namespace xs
class pipe_t;
class io_thread_t;
- class xpub_t :
- public socket_base_t
+ class xpub_t : public socket_base_t, public core_t
{
public:
@@ -59,16 +60,18 @@ namespace xs
private:
- // Function to be applied to the trie to send all the subsciptions
- // upstream.
- static void send_unsubscription (unsigned char *data_, size_t size_,
- void *arg_);
-
- // Function to be applied to each matching pipes.
- static void mark_as_matching (xs::pipe_t *pipe_, void *arg_);
+ // Overloaded functions from core_t.
+ int filter_unsubscribed (const unsigned char *data_, size_t size_);
+ int filter_matching (void *subscriber_);
- // List of all subscriptions mapped to corresponding pipes.
- mtrie_t subscriptions;
+ // The repository of subscriptions.
+ struct filter_t
+ {
+ xs_filter_t *type;
+ void *instance;
+ };
+ typedef std::vector <filter_t> filters_t;
+ filters_t filters;
// Distributor of messages holding the list of outbound pipes.
dist_t dist;
@@ -81,6 +84,9 @@ namespace xs
typedef std::deque <blob_t> pending_t;
pending_t pending;
+ // Different values stored while filter extensions are being executed.
+ int tmp_filter_id;
+
xpub_t (const xpub_t&);
const xpub_t &operator = (const xpub_t&);
};