summaryrefslogtreecommitdiff
path: root/src/xsub.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/xsub.hpp')
-rw-r--r--src/xsub.hpp29
1 files changed, 20 insertions, 9 deletions
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 07fe026..4621570 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -21,11 +21,15 @@
#ifndef __XS_XSUB_HPP_INCLUDED__
#define __XS_XSUB_HPP_INCLUDED__
+#include <vector>
+
+#include "../include/xs.h"
+
#include "socket_base.hpp"
#include "session_base.hpp"
#include "dist.hpp"
+#include "core.hpp"
#include "fq.hpp"
-#include "trie.hpp"
namespace xs
{
@@ -34,8 +38,7 @@ namespace xs
class pipe_t;
class io_thread_t;
- class xsub_t :
- public socket_base_t
+ class xsub_t : public socket_base_t, public core_t
{
public:
@@ -57,14 +60,12 @@ namespace xs
private:
+ // Overloads from core_t class.
+ int filter_subscribed (const unsigned char *data_, size_t size_);
+
// Check whether the message matches at least one subscription.
bool match (xs::msg_t *msg_);
- // Function to be applied to the trie to send all the subsciptions
- // upstream.
- static void send_subscription (unsigned char *data_, size_t size_,
- void *arg_);
-
// Fair queueing object for inbound pipes.
fq_t fq;
@@ -72,7 +73,13 @@ namespace xs
dist_t dist;
// The repository of subscriptions.
- trie_t subscriptions;
+ struct filter_t
+ {
+ xs_filter_t *type;
+ void *instance;
+ };
+ typedef std::vector <filter_t> filters_t;
+ filters_t filters;
// If true, 'message' contains a matching message to return on the
// next recv call.
@@ -83,6 +90,10 @@ namespace xs
// there are following parts still waiting.
bool more;
+ // Different values stored while filter extensions are being executed.
+ pipe_t *tmp_pipe;
+ int tmp_filter_id;
+
xsub_t (const xsub_t&);
const xsub_t &operator = (const xsub_t&);
};