diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2012-04-13 09:34:13 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2012-04-14 05:21:09 +0200 |
commit | 19894e0a1b6fbbcb62028fc6513ef3904a6f5c76 (patch) | |
tree | 365270e76f29acca4d60f66773c3ec375e413a85 /src/xsub.hpp | |
parent | 4f120cb103db3987e01ece48648c844218b91ff2 (diff) |
Separate subscription forwarding from SUB-side filtering
- subscription forwarding is handled by XSUB socket
- filtering is handled by SUB sockets
- subscriptions are decoupled from filter engines
- filter doesn't have to be able to enumarate the subscriptions
(no sf_enumerate function)
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xsub.hpp')
-rw-r--r-- | src/xsub.hpp | 33 |
1 files changed, 6 insertions, 27 deletions
diff --git a/src/xsub.hpp b/src/xsub.hpp index 4621570..f8296d5 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -21,7 +21,7 @@ #ifndef __XS_XSUB_HPP_INCLUDED__ #define __XS_XSUB_HPP_INCLUDED__ -#include <vector> +#include <map> #include "../include/xs.h" @@ -60,11 +60,8 @@ namespace xs private: - // Overloads from core_t class. - int filter_subscribed (const unsigned char *data_, size_t size_); - - // Check whether the message matches at least one subscription. - bool match (xs::msg_t *msg_); + void send_subscription (pipe_t *pipe_, bool subscribe_, int filter_id_, + const unsigned char *data_, size_t size_); // Fair queueing object for inbound pipes. fq_t fq; @@ -72,27 +69,9 @@ namespace xs // Object for distributing the subscriptions upstream. dist_t dist; - // The repository of subscriptions. - struct filter_t - { - xs_filter_t *type; - void *instance; - }; - typedef std::vector <filter_t> filters_t; - filters_t filters; - - // If true, 'message' contains a matching message to return on the - // next recv call. - bool has_message; - msg_t message; - - // If true, part of a multipart message was already received, but - // there are following parts still waiting. - bool more; - - // Different values stored while filter extensions are being executed. - pipe_t *tmp_pipe; - int tmp_filter_id; + // Cache of all subscriptions in place at the moment. + typedef std::map <std::pair <int, blob_t>, int> subscriptions_t; + subscriptions_t subscriptions; xsub_t (const xsub_t&); const xsub_t &operator = (const xsub_t&); |