summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-13 09:34:13 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-14 05:21:09 +0200
commit19894e0a1b6fbbcb62028fc6513ef3904a6f5c76 (patch)
tree365270e76f29acca4d60f66773c3ec375e413a85
parent4f120cb103db3987e01ece48648c844218b91ff2 (diff)
Separate subscription forwarding from SUB-side filtering
- subscription forwarding is handled by XSUB socket - filtering is handled by SUB sockets - subscriptions are decoupled from filter engines - filter doesn't have to be able to enumarate the subscriptions (no sf_enumerate function) Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--include/xs.h1
-rw-r--r--src/options.cpp7
-rw-r--r--src/options.hpp5
-rw-r--r--src/prefix_filter.cpp51
-rw-r--r--src/sub.cpp150
-rw-r--r--src/sub.hpp27
-rw-r--r--src/xsub.cpp222
-rw-r--r--src/xsub.hpp33
8 files changed, 213 insertions, 283 deletions
diff --git a/include/xs.h b/include/xs.h
index e44587e..17a557b 100644
--- a/include/xs.h
+++ b/include/xs.h
@@ -294,7 +294,6 @@ typedef struct
const unsigned char *data, size_t size);
int (*sf_unsubscribe) (void *core, void *sf,
const unsigned char *data, size_t size);
- void (*sf_enumerate) (void *core, void *sf);
int (*sf_match) (void *core, void *sf,
const unsigned char *data, size_t size);
diff --git a/src/options.cpp b/src/options.cpp
index fdbffde..2a72add 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -49,10 +49,9 @@ xs::options_t::options_t () :
ipv4only (1),
keepalive (0),
protocol (0),
- filter_id (XS_FILTER_PREFIX),
+ filter (XS_FILTER_PREFIX),
delay_on_close (true),
delay_on_disconnect (true),
- filter (false),
send_identity (false),
recv_identity (false),
socket_id (0)
@@ -256,7 +255,7 @@ int xs::options_t::setsockopt (int option_, const void *optval_,
errno = EINVAL;
return -1;
}
- filter_id = *((int*) optval_);
+ filter = *((int*) optval_);
return 0;
}
@@ -454,7 +453,7 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
errno = EINVAL;
return -1;
}
- *((int*) optval_) = filter_id;
+ *((int*) optval_) = filter;
*optvallen_ = sizeof (int);
return 0;
diff --git a/src/options.hpp b/src/options.hpp
index 1288f72..fac6084 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -96,7 +96,7 @@ namespace xs
int protocol;
// Filter ID to be used with subscriptions and unsubscriptions.
- int filter_id;
+ int filter;
// If true, session reads all the pending messages from the pipe and
// sends them to the network when socket is closed.
@@ -106,9 +106,6 @@ namespace xs
// them to the user when the peer terminates.
bool delay_on_disconnect;
- // If 1, (X)SUB socket should filter the messages. If 0, it should not.
- bool filter;
-
// Sends identity to all new connections.
bool send_identity;
diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp
index d13c5b4..0514763 100644
--- a/src/prefix_filter.cpp
+++ b/src/prefix_filter.cpp
@@ -422,43 +422,6 @@ static bool pfx_rm (pfx_node_t *node_, const unsigned char *prefix_,
return ret;
}
-static void pfx_list (pfx_node_t *node_, unsigned char **buff_,
- size_t buffsize_, size_t maxbuffsize_, void *arg_)
-{
- // If this node is a subscription, apply the function.
- if (node_->subscribers) {
- int rc = xs_filter_subscribed (arg_, *buff_, buffsize_);
- errno_assert (rc == 0);
- }
-
- // Adjust the buffer.
- if (buffsize_ >= maxbuffsize_) {
- maxbuffsize_ = buffsize_ + 256;
- *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
- xs_assert (*buff_);
- }
-
- // If there are no subnodes in the trie, return.
- if (node_->count == 0)
- return;
-
- // If there's one subnode (optimisation).
- if (node_->count == 1) {
- (*buff_) [buffsize_] = node_->min;
- buffsize_++;
- pfx_list (node_->next.node, buff_, buffsize_, maxbuffsize_, arg_);
- return;
- }
-
- // If there are multiple subnodes.
- for (unsigned short c = 0; c != node_->count; c++) {
- (*buff_) [buffsize_] = node_->min + c;
- if (node_->next.table [c])
- pfx_list (node_->next.table [c], buff_, buffsize_ + 1,
- maxbuffsize_, arg_);
- }
-}
-
// Implementation of the public filter interface.
static int id (void *core_)
@@ -562,20 +525,15 @@ static void sf_destroy (void *core_, void *sf_)
static int sf_subscribe (void *core_, void *sf_,
const unsigned char *data_, size_t size_)
{
- return pfx_add ((pfx_node_t*) sf_, data_, size_, NULL) ? 1 : 0;
+ pfx_add ((pfx_node_t*) sf_, data_, size_, NULL);
+ return 0;
}
static int sf_unsubscribe (void *core_, void *sf_,
const unsigned char *data_, size_t size_)
{
- return pfx_rm ((pfx_node_t*) sf_, data_, size_, NULL) ? 1 : 0;
-}
-
-static void sf_enumerate (void *core_, void *sf_)
-{
- unsigned char *buff = NULL;
- pfx_list ((pfx_node_t*) sf_, &buff, 0, 0, core_);
- free (buff);
+ pfx_rm ((pfx_node_t*) sf_, data_, size_, NULL);
+ return 0;
}
static int sf_match (void *core_, void *sf_,
@@ -627,7 +585,6 @@ static xs_filter_t pfx_filter = {
sf_destroy,
sf_subscribe,
sf_unsubscribe,
- sf_enumerate,
sf_match,
};
diff --git a/src/sub.cpp b/src/sub.cpp
index 442dd9b..2aa3901 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -24,17 +24,24 @@
#include "wire.hpp"
xs::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
- xsub_t (parent_, tid_, sid_)
+ xsub_t (parent_, tid_, sid_),
+ more (false),
+ has_message (false)
{
options.type = XS_SUB;
- // Switch filtering messages on (as opposed to XSUB which where the
- // filtering is off).
- options.filter = true;
+ int rc = message.init ();
+ errno_assert (rc == 0);
}
xs::sub_t::~sub_t ()
{
+ // Deallocate all the filters.
+ for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
+ it->type->sf_destroy ((void*) (core_t*) this, it->instance);
+
+ int rc = message.close ();
+ errno_assert (rc == 0);
}
int xs::sub_t::xsetsockopt (int option_, const void *optval_,
@@ -50,6 +57,36 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,
return -1;
}
+ // Find the relevant filter.
+ filters_t::iterator it;
+ for (it = filters.begin (); it != filters.end (); ++it)
+ if (it->type->id (NULL) == options.filter)
+ break;
+
+ // Process the subscription. If the filter of the specified type does not
+ // exist yet, create it.
+ if (option_ == XS_SUBSCRIBE) {
+ if (it == filters.end ()) {
+ filter_t f;
+ f.type = get_filter (options.filter);
+ xs_assert (f.type);
+ f.instance = f.type->sf_create ((void*) (core_t*) this);
+ xs_assert (f.instance);
+ filters.push_back (f);
+ it = filters.end () - 1;
+ }
+ int rc = it->type->sf_subscribe ((void*) (core_t*) this, it->instance,
+ (const unsigned char*) optval_, optvallen_);
+ errno_assert (rc == 0);
+ }
+ else if (option_ == XS_UNSUBSCRIBE) {
+ xs_assert (it != filters.end ());
+ int rc = it->type->sf_unsubscribe ((void*) (core_t*) this, it->instance,
+ (const unsigned char*) optval_, optvallen_);
+ errno_assert (rc == 0);
+ }
+
+
// Create the subscription message.
msg_t msg;
int rc = msg.init_size (optvallen_ + 4);
@@ -59,22 +96,9 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,
put_uint16 (data, XS_CMD_SUBSCRIBE);
else if (option_ == XS_UNSUBSCRIBE)
put_uint16 (data, XS_CMD_UNSUBSCRIBE);
- put_uint16 (data + 2, options.filter_id);
+ put_uint16 (data + 2, options.filter);
memcpy (data + 4, optval_, optvallen_);
-#if 0
- // TODO: This is 0MQ/3.1 protocol.
- msg_t msg;
- int rc = msg.init_size (optvallen_ + 1);
- errno_assert (rc == 0);
- unsigned char *data = (unsigned char*) msg.data ();
- if (option_ == XS_SUBSCRIBE)
- data [0] = 1;
- else if (option_ == XS_UNSUBSCRIBE)
- data [0] = 0;
- memcpy (data + 1, optval_, optvallen_);
-#endif
-
// Pass it further on in the stack.
int err = 0;
rc = xsub_t::xsend (&msg, 0);
@@ -94,12 +118,102 @@ int xs::sub_t::xsend (msg_t *msg_, int flags_)
return -1;
}
+int xs::sub_t::xrecv (msg_t *msg_, int flags_)
+{
+ // If there's already a message prepared by a previous call to xs_poll,
+ // return it straight ahead.
+ if (has_message) {
+ int rc = msg_->move (message);
+ errno_assert (rc == 0);
+ has_message = false;
+ more = msg_->flags () & msg_t::more ? true : false;
+ return 0;
+ }
+
+ // TODO: This can result in infinite loop in the case of continuous
+ // stream of non-matching messages which breaks the non-blocking recv
+ // semantics.
+ while (true) {
+
+ // Get a message using fair queueing algorithm.
+ int rc = xsub_t::xrecv (msg_, flags_);
+
+ // If there's no message available, return immediately.
+ // The same when error occurs.
+ if (rc != 0)
+ return -1;
+
+ // Check whether the message matches at least one subscription.
+ // Non-initial parts of the message are passed automatically.
+ if (more || match (msg_)) {
+ more = msg_->flags () & msg_t::more ? true : false;
+ return 0;
+ }
+
+ // Message doesn't match. Pop any remaining parts of the message
+ // from the pipe.
+ while (msg_->flags () & msg_t::more) {
+ rc = xsub_t::xrecv (msg_, XS_DONTWAIT);
+ xs_assert (rc == 0);
+ }
+ }
+}
+
+bool xs::sub_t::xhas_in ()
+{
+ // There are subsequent parts of the partly-read message available.
+ if (more)
+ return true;
+
+ // If there's already a message prepared by a previous call to xs_poll,
+ // return straight ahead.
+ if (has_message)
+ return true;
+
+ // TODO: This can result in infinite loop in the case of continuous
+ // stream of non-matching messages.
+ while (true) {
+
+ // Get a message using fair queueing algorithm.
+ int rc = xsub_t::xrecv (&message, XS_DONTWAIT);
+
+ // If there's no message available, return immediately.
+ // The same when error occurs.
+ if (rc != 0) {
+ xs_assert (errno == EAGAIN);
+ return false;
+ }
+
+ // Check whether the message matches at least one subscription.
+ if (match (&message)) {
+ has_message = true;
+ return true;
+ }
+
+ // Message doesn't match. Pop any remaining parts of the message
+ // from the pipe.
+ while (message.flags () & msg_t::more) {
+ rc = xsub_t::xrecv (&message, XS_DONTWAIT);
+ xs_assert (rc == 0);
+ }
+ }
+}
+
bool xs::sub_t::xhas_out ()
{
// Overload the XSUB's send.
return false;
}
+bool xs::sub_t::match (msg_t *msg_)
+{
+ for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
+ if (it->type->sf_match ((void*) (core_t*) this, it->instance,
+ (unsigned char*) msg_->data (), msg_->size ()))
+ return true;
+ return false;
+}
+
xs::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
diff --git a/src/sub.hpp b/src/sub.hpp
index 47b1877..4c3bf7f 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -22,6 +22,10 @@
#ifndef __XS_SUB_HPP_INCLUDED__
#define __XS_SUB_HPP_INCLUDED__
+#include <vector>
+
+#include "../include/xs.h"
+
#include "xsub.hpp"
namespace xs
@@ -43,10 +47,33 @@ namespace xs
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
bool xhas_out ();
+ // The repository of subscriptions.
+ struct filter_t
+ {
+ xs_filter_t *type;
+ void *instance;
+ };
+ typedef std::vector <filter_t> filters_t;
+ filters_t filters;
+
+ // If true, part of a multipart message was already received, but
+ // there are following parts still waiting.
+ bool more;
+
+ // If true, 'message' contains a matching message to return on the
+ // next recv call.
+ bool has_message;
+ msg_t message;
+
private:
+ // Check whether the message matches at least one subscription.
+ bool match (xs::msg_t *msg_);
+
sub_t (const sub_t&);
const sub_t &operator = (const sub_t&);
};
diff --git a/src/xsub.cpp b/src/xsub.cpp
index da56586..3afd734 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -26,11 +26,7 @@
#include "wire.hpp"
xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
- socket_base_t (parent_, tid_, sid_),
- has_message (false),
- more (false),
- tmp_pipe (NULL),
- tmp_filter_id (-1)
+ socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSUB;
@@ -40,19 +36,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
// Also, we want the subscription buffer to be elastic by default.
options.sndhwm = 0;
-
- int rc = message.init ();
- errno_assert (rc == 0);
}
xs::xsub_t::~xsub_t ()
{
- // Deallocate all the filters.
- for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
- it->type->sf_destroy ((void*) (core_t*) this, it->instance);
-
- int rc = message.close ();
- errno_assert (rc == 0);
}
void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
@@ -62,19 +49,17 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// Pipes with 0MQ/2.1-style protocol are not eligible for accepting
// subscriptions.
- if (pipe_->get_protocol () != 1)
- dist.attach (pipe_);
+ if (pipe_->get_protocol () == 1)
+ return;
+
+ dist.attach (pipe_);
// Send all the cached subscriptions to the new upstream peer.
- tmp_pipe = pipe_;
- for (filters_t::iterator it = filters.begin (); it != filters.end ();
- ++it) {
- tmp_filter_id = it->type->id (NULL);
- it->type->sf_enumerate ((void*) (core_t*) this, it->instance);
- tmp_filter_id = -1;
- }
+ for (subscriptions_t::iterator its = subscriptions.begin ();
+ its != subscriptions.end (); ++its)
+ send_subscription (pipe_, true, its->first.first,
+ its->first.second.data (), its->first.second.size ());
pipe_->flush ();
- tmp_pipe = NULL;
}
void xs::xsub_t::xread_activated (pipe_t *pipe_)
@@ -96,18 +81,16 @@ void xs::xsub_t::xterminated (pipe_t *pipe_)
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
- // Send all the cached subscriptions to the hiccuped pipe.
- if (pipe_->get_protocol () != 1) {
- tmp_pipe = pipe_;
- for (filters_t::iterator it = filters.begin (); it != filters.end ();
- ++it) {
- tmp_filter_id = it->type->id (NULL);
- it->type->sf_enumerate ((void*) (core_t*) this, it->instance);
- tmp_filter_id = -1;
- }
- pipe_->flush ();
- tmp_pipe = NULL;
- }
+ // In 0MQ/2.1 protocol there is no subscription forwarding.
+ if (pipe_->get_protocol () == 1)
+ return;
+
+ // Send all the cached subscriptions to the new upstream peer.
+ for (subscriptions_t::iterator its = subscriptions.begin ();
+ its != subscriptions.end (); ++its)
+ send_subscription (pipe_, true, its->first.first,
+ its->first.second.data (), its->first.second.size ());
+ pipe_->flush ();
}
int xs::xsub_t::xsend (msg_t *msg_, int flags_)
@@ -121,66 +104,33 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
}
int cmd = get_uint16 (data);
int filter_id = get_uint16 (data + 2);
-
-#if 0
- // TODO: This is 0MQ/3.1 protocol.
- if (size < 1) {
- errno = EINVAL;
- return -1;
- }
- int cmd = data [0] ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE;
- int filter_id = XS_FILTER_PREFIX;
-#endif
-
if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
errno = EINVAL;
return -1;
}
-
- // Find the relevant filter.
- filters_t::iterator it;
- for (it = filters.begin (); it != filters.end (); ++it)
- if (it->type->id (NULL) == filter_id)
- break;
// Process the subscription.
if (cmd == XS_CMD_SUBSCRIBE) {
-
- // If the filter of the specified type does not exist yet, create it.
- if (it == filters.end ()) {
- filter_t f;
- f.type = get_filter (filter_id);
- xs_assert (f.type);
- f.instance = f.type->sf_create ((void*) (core_t*) this);
- xs_assert (f.instance);
- filters.push_back (f);
- it = filters.end () - 1;
- }
-
- if (it->type->sf_subscribe ((void*) (core_t*) this,
- it->instance, data + 4, size - 4) == 1)
-#if 0
- // TODO: This is 0MQ/3.1 protocol.
- if (it->type->sf_subscribe ((void*) (core_t*) this,
- it->instance, data + 1, size - 1) == 1)
-#endif
+ subscriptions_t::iterator it = subscriptions.insert (
+ std::make_pair (std::make_pair (filter_id,
+ blob_t (data + 4, size - 4)), 0)).first;
+ ++it->second;
+ if (it->second == 1)
return dist.send_to_all (msg_, flags_);
else
return 0;
}
else if (cmd == XS_CMD_UNSUBSCRIBE) {
- xs_assert (it != filters.end ());
-
- if (it->type->sf_unsubscribe ((void*) (core_t*) this,
- it->instance, data + 4, size - 4) == 1)
-#if 0
- // TODO: This is 0MQ/3.1 protocol.
- if (it->type->sf_unsubscribe ((void*) (core_t*) this,
- it->instance, data + 1, size - 1) == 1)
-#endif
- return dist.send_to_all (msg_, flags_);
- else
+ subscriptions_t::iterator it = subscriptions.find (
+ std::make_pair (filter_id, blob_t (data + 4, size - 4)));
+ if (it == subscriptions.end ())
return 0;
+ xs_assert (it->second);
+ --it->second;
+ if (it->second)
+ return 0;
+ subscriptions.erase (it);
+ return dist.send_to_all (msg_, flags_);
}
xs_assert (false);
@@ -195,118 +145,28 @@ bool xs::xsub_t::xhas_out ()
int xs::xsub_t::xrecv (msg_t *msg_, int flags_)
{
- // If there's already a message prepared by a previous call to xs_poll,
- // return it straight ahead.
- if (has_message) {
- int rc = msg_->move (message);
- errno_assert (rc == 0);
- has_message = false;
- more = msg_->flags () & msg_t::more ? true : false;
- return 0;
- }
-
- // TODO: This can result in infinite loop in the case of continuous
- // stream of non-matching messages which breaks the non-blocking recv
- // semantics.
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (msg_, flags_);
-
- // If there's no message available, return immediately.
- // The same when error occurs.
- if (rc != 0)
- return -1;
-
- // Check whether the message matches at least one subscription.
- // Non-initial parts of the message are passed
- if (more || !options.filter || match (msg_)) {
- more = msg_->flags () & msg_t::more ? true : false;
- return 0;
- }
-
- // Message doesn't match. Pop any remaining parts of the message
- // from the pipe.
- while (msg_->flags () & msg_t::more) {
- rc = fq.recv (msg_, XS_DONTWAIT);
- xs_assert (rc == 0);
- }
- }
+ return fq.recv (msg_, flags_);
}
bool xs::xsub_t::xhas_in ()
{
- // There are subsequent parts of the partly-read message available.
- if (more)
- return true;
-
- // If there's already a message prepared by a previous call to xs_poll,
- // return straight ahead.
- if (has_message)
- return true;
-
- // TODO: This can result in infinite loop in the case of continuous
- // stream of non-matching messages.
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (&message, XS_DONTWAIT);
-
- // If there's no message available, return immediately.
- // The same when error occurs.
- if (rc != 0) {
- xs_assert (errno == EAGAIN);
- return false;
- }
-
- // Check whether the message matches at least one subscription.
- if (!options.filter || match (&message)) {
- has_message = true;
- return true;
- }
-
- // Message doesn't match. Pop any remaining parts of the message
- // from the pipe.
- while (message.flags () & msg_t::more) {
- rc = fq.recv (&message, XS_DONTWAIT);
- xs_assert (rc == 0);
- }
- }
+ return fq.has_in ();
}
-bool xs::xsub_t::match (msg_t *msg_)
-{
- for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
- if (it->type->sf_match ((void*) (core_t*) this, it->instance,
- (unsigned char*) msg_->data (), msg_->size ()))
- return true;
- return false;
-}
-
-int xs::xsub_t::filter_subscribed (const unsigned char *data_, size_t size_)
+void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_,
+ int filter_id_, const unsigned char *data_, size_t size_)
{
// Create the subsctription message.
msg_t msg;
int rc = msg.init_size (size_ + 4);
xs_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, XS_CMD_SUBSCRIBE);
- put_uint16 (data + 2, tmp_filter_id);
+ put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE);
+ put_uint16 (data + 2, filter_id_);
memcpy (data + 4, data_, size_);
-#if 0
- // TODO: This is 0MQ/3.1 protocol.
- xs_assert (tmp_filter_id == XS_FILTER_PREFIX);
- msg_t msg;
- int rc = msg.init_size (size_ + 1);
- xs_assert (rc == 0);
- unsigned char *data = (unsigned char*) msg.data ();
- data [0] = 1;
- memcpy (data + 1, data_, size_);
-#endif
-
// Send it to the pipe.
- bool sent = tmp_pipe->write (&msg);
+ bool sent = pipe_->write (&msg);
// If we reached the SNDHWM, and thus cannot send the subscription, drop
// the subscription message instead. This matches the behaviour of
@@ -314,8 +174,6 @@ int xs::xsub_t::filter_subscribed (const unsigned char *data_, size_t size_)
// when the SNDHWM is reached.
if (!sent)
msg.close ();
-
- return 0;
}
xs::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_,
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 4621570..f8296d5 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -21,7 +21,7 @@
#ifndef __XS_XSUB_HPP_INCLUDED__
#define __XS_XSUB_HPP_INCLUDED__
-#include <vector>
+#include <map>
#include "../include/xs.h"
@@ -60,11 +60,8 @@ namespace xs
private:
- // Overloads from core_t class.
- int filter_subscribed (const unsigned char *data_, size_t size_);
-
- // Check whether the message matches at least one subscription.
- bool match (xs::msg_t *msg_);
+ void send_subscription (pipe_t *pipe_, bool subscribe_, int filter_id_,
+ const unsigned char *data_, size_t size_);
// Fair queueing object for inbound pipes.
fq_t fq;
@@ -72,27 +69,9 @@ namespace xs
// Object for distributing the subscriptions upstream.
dist_t dist;
- // The repository of subscriptions.
- struct filter_t
- {
- xs_filter_t *type;
- void *instance;
- };
- typedef std::vector <filter_t> filters_t;
- filters_t filters;
-
- // If true, 'message' contains a matching message to return on the
- // next recv call.
- bool has_message;
- msg_t message;
-
- // If true, part of a multipart message was already received, but
- // there are following parts still waiting.
- bool more;
-
- // Different values stored while filter extensions are being executed.
- pipe_t *tmp_pipe;
- int tmp_filter_id;
+ // Cache of all subscriptions in place at the moment.
+ typedef std::map <std::pair <int, blob_t>, int> subscriptions_t;
+ subscriptions_t subscriptions;
xsub_t (const xsub_t&);
const xsub_t &operator = (const xsub_t&);