summaryrefslogtreecommitdiff
path: root/src/xsub.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-13 09:34:13 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-14 05:21:09 +0200
commit19894e0a1b6fbbcb62028fc6513ef3904a6f5c76 (patch)
tree365270e76f29acca4d60f66773c3ec375e413a85 /src/xsub.cpp
parent4f120cb103db3987e01ece48648c844218b91ff2 (diff)
Separate subscription forwarding from SUB-side filtering
- subscription forwarding is handled by XSUB socket - filtering is handled by SUB sockets - subscriptions are decoupled from filter engines - filter doesn't have to be able to enumarate the subscriptions (no sf_enumerate function) Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xsub.cpp')
-rw-r--r--src/xsub.cpp222
1 files changed, 40 insertions, 182 deletions
diff --git a/src/xsub.cpp b/src/xsub.cpp
index da56586..3afd734 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -26,11 +26,7 @@
#include "wire.hpp"
xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
- socket_base_t (parent_, tid_, sid_),
- has_message (false),
- more (false),
- tmp_pipe (NULL),
- tmp_filter_id (-1)
+ socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSUB;
@@ -40,19 +36,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
// Also, we want the subscription buffer to be elastic by default.
options.sndhwm = 0;
-
- int rc = message.init ();
- errno_assert (rc == 0);
}
xs::xsub_t::~xsub_t ()
{
- // Deallocate all the filters.
- for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
- it->type->sf_destroy ((void*) (core_t*) this, it->instance);
-
- int rc = message.close ();
- errno_assert (rc == 0);
}
void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
@@ -62,19 +49,17 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// Pipes with 0MQ/2.1-style protocol are not eligible for accepting
// subscriptions.
- if (pipe_->get_protocol () != 1)
- dist.attach (pipe_);
+ if (pipe_->get_protocol () == 1)
+ return;
+
+ dist.attach (pipe_);
// Send all the cached subscriptions to the new upstream peer.
- tmp_pipe = pipe_;
- for (filters_t::iterator it = filters.begin (); it != filters.end ();
- ++it) {
- tmp_filter_id = it->type->id (NULL);
- it->type->sf_enumerate ((void*) (core_t*) this, it->instance);
- tmp_filter_id = -1;
- }
+ for (subscriptions_t::iterator its = subscriptions.begin ();
+ its != subscriptions.end (); ++its)
+ send_subscription (pipe_, true, its->first.first,
+ its->first.second.data (), its->first.second.size ());
pipe_->flush ();
- tmp_pipe = NULL;
}
void xs::xsub_t::xread_activated (pipe_t *pipe_)
@@ -96,18 +81,16 @@ void xs::xsub_t::xterminated (pipe_t *pipe_)
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
- // Send all the cached subscriptions to the hiccuped pipe.
- if (pipe_->get_protocol () != 1) {
- tmp_pipe = pipe_;
- for (filters_t::iterator it = filters.begin (); it != filters.end ();
- ++it) {
- tmp_filter_id = it->type->id (NULL);
- it->type->sf_enumerate ((void*) (core_t*) this, it->instance);
- tmp_filter_id = -1;
- }
- pipe_->flush ();
- tmp_pipe = NULL;
- }
+ // In 0MQ/2.1 protocol there is no subscription forwarding.
+ if (pipe_->get_protocol () == 1)
+ return;
+
+ // Send all the cached subscriptions to the new upstream peer.
+ for (subscriptions_t::iterator its = subscriptions.begin ();
+ its != subscriptions.end (); ++its)
+ send_subscription (pipe_, true, its->first.first,
+ its->first.second.data (), its->first.second.size ());
+ pipe_->flush ();
}
int xs::xsub_t::xsend (msg_t *msg_, int flags_)
@@ -121,66 +104,33 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
}
int cmd = get_uint16 (data);
int filter_id = get_uint16 (data + 2);
-
-#if 0
- // TODO: This is 0MQ/3.1 protocol.
- if (size < 1) {
- errno = EINVAL;
- return -1;
- }
- int cmd = data [0] ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE;
- int filter_id = XS_FILTER_PREFIX;
-#endif
-
if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
errno = EINVAL;
return -1;
}
-
- // Find the relevant filter.
- filters_t::iterator it;
- for (it = filters.begin (); it != filters.end (); ++it)
- if (it->type->id (NULL) == filter_id)
- break;
// Process the subscription.
if (cmd == XS_CMD_SUBSCRIBE) {
-
- // If the filter of the specified type does not exist yet, create it.
- if (it == filters.end ()) {
- filter_t f;
- f.type = get_filter (filter_id);
- xs_assert (f.type);
- f.instance = f.type->sf_create ((void*) (core_t*) this);
- xs_assert (f.instance);
- filters.push_back (f);
- it = filters.end () - 1;
- }
-
- if (it->type->sf_subscribe ((void*) (core_t*) this,
- it->instance, data + 4, size - 4) == 1)
-#if 0
- // TODO: This is 0MQ/3.1 protocol.
- if (it->type->sf_subscribe ((void*) (core_t*) this,
- it->instance, data + 1, size - 1) == 1)
-#endif
+ subscriptions_t::iterator it = subscriptions.insert (
+ std::make_pair (std::make_pair (filter_id,
+ blob_t (data + 4, size - 4)), 0)).first;
+ ++it->second;
+ if (it->second == 1)
return dist.send_to_all (msg_, flags_);
else
return 0;
}
else if (cmd == XS_CMD_UNSUBSCRIBE) {
- xs_assert (it != filters.end ());
-
- if (it->type->sf_unsubscribe ((void*) (core_t*) this,
- it->instance, data + 4, size - 4) == 1)
-#if 0
- // TODO: This is 0MQ/3.1 protocol.
- if (it->type->sf_unsubscribe ((void*) (core_t*) this,
- it->instance, data + 1, size - 1) == 1)
-#endif
- return dist.send_to_all (msg_, flags_);
- else
+ subscriptions_t::iterator it = subscriptions.find (
+ std::make_pair (filter_id, blob_t (data + 4, size - 4)));
+ if (it == subscriptions.end ())
return 0;
+ xs_assert (it->second);
+ --it->second;
+ if (it->second)
+ return 0;
+ subscriptions.erase (it);
+ return dist.send_to_all (msg_, flags_);
}
xs_assert (false);
@@ -195,118 +145,28 @@ bool xs::xsub_t::xhas_out ()
int xs::xsub_t::xrecv (msg_t *msg_, int flags_)
{
- // If there's already a message prepared by a previous call to xs_poll,
- // return it straight ahead.
- if (has_message) {
- int rc = msg_->move (message);
- errno_assert (rc == 0);
- has_message = false;
- more = msg_->flags () & msg_t::more ? true : false;
- return 0;
- }
-
- // TODO: This can result in infinite loop in the case of continuous
- // stream of non-matching messages which breaks the non-blocking recv
- // semantics.
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (msg_, flags_);
-
- // If there's no message available, return immediately.
- // The same when error occurs.
- if (rc != 0)
- return -1;
-
- // Check whether the message matches at least one subscription.
- // Non-initial parts of the message are passed
- if (more || !options.filter || match (msg_)) {
- more = msg_->flags () & msg_t::more ? true : false;
- return 0;
- }
-
- // Message doesn't match. Pop any remaining parts of the message
- // from the pipe.
- while (msg_->flags () & msg_t::more) {
- rc = fq.recv (msg_, XS_DONTWAIT);
- xs_assert (rc == 0);
- }
- }
+ return fq.recv (msg_, flags_);
}
bool xs::xsub_t::xhas_in ()
{
- // There are subsequent parts of the partly-read message available.
- if (more)
- return true;
-
- // If there's already a message prepared by a previous call to xs_poll,
- // return straight ahead.
- if (has_message)
- return true;
-
- // TODO: This can result in infinite loop in the case of continuous
- // stream of non-matching messages.
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (&message, XS_DONTWAIT);
-
- // If there's no message available, return immediately.
- // The same when error occurs.
- if (rc != 0) {
- xs_assert (errno == EAGAIN);
- return false;
- }
-
- // Check whether the message matches at least one subscription.
- if (!options.filter || match (&message)) {
- has_message = true;
- return true;
- }
-
- // Message doesn't match. Pop any remaining parts of the message
- // from the pipe.
- while (message.flags () & msg_t::more) {
- rc = fq.recv (&message, XS_DONTWAIT);
- xs_assert (rc == 0);
- }
- }
+ return fq.has_in ();
}
-bool xs::xsub_t::match (msg_t *msg_)
-{
- for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
- if (it->type->sf_match ((void*) (core_t*) this, it->instance,
- (unsigned char*) msg_->data (), msg_->size ()))
- return true;
- return false;
-}
-
-int xs::xsub_t::filter_subscribed (const unsigned char *data_, size_t size_)
+void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_,
+ int filter_id_, const unsigned char *data_, size_t size_)
{
// Create the subsctription message.
msg_t msg;
int rc = msg.init_size (size_ + 4);
xs_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, XS_CMD_SUBSCRIBE);
- put_uint16 (data + 2, tmp_filter_id);
+ put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE);
+ put_uint16 (data + 2, filter_id_);
memcpy (data + 4, data_, size_);
-#if 0
- // TODO: This is 0MQ/3.1 protocol.
- xs_assert (tmp_filter_id == XS_FILTER_PREFIX);
- msg_t msg;
- int rc = msg.init_size (size_ + 1);
- xs_assert (rc == 0);
- unsigned char *data = (unsigned char*) msg.data ();
- data [0] = 1;
- memcpy (data + 1, data_, size_);
-#endif
-
// Send it to the pipe.
- bool sent = tmp_pipe->write (&msg);
+ bool sent = pipe_->write (&msg);
// If we reached the SNDHWM, and thus cannot send the subscription, drop
// the subscription message instead. This matches the behaviour of
@@ -314,8 +174,6 @@ int xs::xsub_t::filter_subscribed (const unsigned char *data_, size_t size_)
// when the SNDHWM is reached.
if (!sent)
msg.close ();
-
- return 0;
}
xs::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_,