summaryrefslogtreecommitdiff
path: root/src/xsub.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-05 07:32:58 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-05 07:32:58 +0200
commitd82cbb3a81f116cd22e9895ecac36ac3d7b38929 (patch)
tree03c923311b937f550bec325d131476513a02bebf /src/xsub.hpp
parent52b8a917deb2990e7197b82e81e0258ebe30f424 (diff)
XS_PLUGIN and XS_FILTER implementation
This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xsub.hpp')
-rw-r--r--src/xsub.hpp29
1 files changed, 20 insertions, 9 deletions
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 07fe026..4621570 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -21,11 +21,15 @@
#ifndef __XS_XSUB_HPP_INCLUDED__
#define __XS_XSUB_HPP_INCLUDED__
+#include <vector>
+
+#include "../include/xs.h"
+
#include "socket_base.hpp"
#include "session_base.hpp"
#include "dist.hpp"
+#include "core.hpp"
#include "fq.hpp"
-#include "trie.hpp"
namespace xs
{
@@ -34,8 +38,7 @@ namespace xs
class pipe_t;
class io_thread_t;
- class xsub_t :
- public socket_base_t
+ class xsub_t : public socket_base_t, public core_t
{
public:
@@ -57,14 +60,12 @@ namespace xs
private:
+ // Overloads from core_t class.
+ int filter_subscribed (const unsigned char *data_, size_t size_);
+
// Check whether the message matches at least one subscription.
bool match (xs::msg_t *msg_);
- // Function to be applied to the trie to send all the subsciptions
- // upstream.
- static void send_subscription (unsigned char *data_, size_t size_,
- void *arg_);
-
// Fair queueing object for inbound pipes.
fq_t fq;
@@ -72,7 +73,13 @@ namespace xs
dist_t dist;
// The repository of subscriptions.
- trie_t subscriptions;
+ struct filter_t
+ {
+ xs_filter_t *type;
+ void *instance;
+ };
+ typedef std::vector <filter_t> filters_t;
+ filters_t filters;
// If true, 'message' contains a matching message to return on the
// next recv call.
@@ -83,6 +90,10 @@ namespace xs
// there are following parts still waiting.
bool more;
+ // Different values stored while filter extensions are being executed.
+ pipe_t *tmp_pipe;
+ int tmp_filter_id;
+
xsub_t (const xsub_t&);
const xsub_t &operator = (const xsub_t&);
};