From 19894e0a1b6fbbcb62028fc6513ef3904a6f5c76 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 13 Apr 2012 09:34:13 +0200 Subject: Separate subscription forwarding from SUB-side filtering - subscription forwarding is handled by XSUB socket - filtering is handled by SUB sockets - subscriptions are decoupled from filter engines - filter doesn't have to be able to enumarate the subscriptions (no sf_enumerate function) Signed-off-by: Martin Sustrik --- src/options.cpp | 7 +- src/options.hpp | 5 +- src/prefix_filter.cpp | 51 +----------- src/sub.cpp | 150 ++++++++++++++++++++++++++++++---- src/sub.hpp | 27 ++++++ src/xsub.cpp | 222 +++++++++----------------------------------------- src/xsub.hpp | 33 ++------ 7 files changed, 213 insertions(+), 282 deletions(-) (limited to 'src') diff --git a/src/options.cpp b/src/options.cpp index fdbffde..2a72add 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -49,10 +49,9 @@ xs::options_t::options_t () : ipv4only (1), keepalive (0), protocol (0), - filter_id (XS_FILTER_PREFIX), + filter (XS_FILTER_PREFIX), delay_on_close (true), delay_on_disconnect (true), - filter (false), send_identity (false), recv_identity (false), socket_id (0) @@ -256,7 +255,7 @@ int xs::options_t::setsockopt (int option_, const void *optval_, errno = EINVAL; return -1; } - filter_id = *((int*) optval_); + filter = *((int*) optval_); return 0; } @@ -454,7 +453,7 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) errno = EINVAL; return -1; } - *((int*) optval_) = filter_id; + *((int*) optval_) = filter; *optvallen_ = sizeof (int); return 0; diff --git a/src/options.hpp b/src/options.hpp index 1288f72..fac6084 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -96,7 +96,7 @@ namespace xs int protocol; // Filter ID to be used with subscriptions and unsubscriptions. - int filter_id; + int filter; // If true, session reads all the pending messages from the pipe and // sends them to the network when socket is closed. @@ -106,9 +106,6 @@ namespace xs // them to the user when the peer terminates. bool delay_on_disconnect; - // If 1, (X)SUB socket should filter the messages. If 0, it should not. - bool filter; - // Sends identity to all new connections. bool send_identity; diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp index d13c5b4..0514763 100644 --- a/src/prefix_filter.cpp +++ b/src/prefix_filter.cpp @@ -422,43 +422,6 @@ static bool pfx_rm (pfx_node_t *node_, const unsigned char *prefix_, return ret; } -static void pfx_list (pfx_node_t *node_, unsigned char **buff_, - size_t buffsize_, size_t maxbuffsize_, void *arg_) -{ - // If this node is a subscription, apply the function. - if (node_->subscribers) { - int rc = xs_filter_subscribed (arg_, *buff_, buffsize_); - errno_assert (rc == 0); - } - - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); - xs_assert (*buff_); - } - - // If there are no subnodes in the trie, return. - if (node_->count == 0) - return; - - // If there's one subnode (optimisation). - if (node_->count == 1) { - (*buff_) [buffsize_] = node_->min; - buffsize_++; - pfx_list (node_->next.node, buff_, buffsize_, maxbuffsize_, arg_); - return; - } - - // If there are multiple subnodes. - for (unsigned short c = 0; c != node_->count; c++) { - (*buff_) [buffsize_] = node_->min + c; - if (node_->next.table [c]) - pfx_list (node_->next.table [c], buff_, buffsize_ + 1, - maxbuffsize_, arg_); - } -} - // Implementation of the public filter interface. static int id (void *core_) @@ -562,20 +525,15 @@ static void sf_destroy (void *core_, void *sf_) static int sf_subscribe (void *core_, void *sf_, const unsigned char *data_, size_t size_) { - return pfx_add ((pfx_node_t*) sf_, data_, size_, NULL) ? 1 : 0; + pfx_add ((pfx_node_t*) sf_, data_, size_, NULL); + return 0; } static int sf_unsubscribe (void *core_, void *sf_, const unsigned char *data_, size_t size_) { - return pfx_rm ((pfx_node_t*) sf_, data_, size_, NULL) ? 1 : 0; -} - -static void sf_enumerate (void *core_, void *sf_) -{ - unsigned char *buff = NULL; - pfx_list ((pfx_node_t*) sf_, &buff, 0, 0, core_); - free (buff); + pfx_rm ((pfx_node_t*) sf_, data_, size_, NULL); + return 0; } static int sf_match (void *core_, void *sf_, @@ -627,7 +585,6 @@ static xs_filter_t pfx_filter = { sf_destroy, sf_subscribe, sf_unsubscribe, - sf_enumerate, sf_match, }; diff --git a/src/sub.cpp b/src/sub.cpp index 442dd9b..2aa3901 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -24,17 +24,24 @@ #include "wire.hpp" xs::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - xsub_t (parent_, tid_, sid_) + xsub_t (parent_, tid_, sid_), + more (false), + has_message (false) { options.type = XS_SUB; - // Switch filtering messages on (as opposed to XSUB which where the - // filtering is off). - options.filter = true; + int rc = message.init (); + errno_assert (rc == 0); } xs::sub_t::~sub_t () { + // Deallocate all the filters. + for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) + it->type->sf_destroy ((void*) (core_t*) this, it->instance); + + int rc = message.close (); + errno_assert (rc == 0); } int xs::sub_t::xsetsockopt (int option_, const void *optval_, @@ -50,6 +57,36 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_, return -1; } + // Find the relevant filter. + filters_t::iterator it; + for (it = filters.begin (); it != filters.end (); ++it) + if (it->type->id (NULL) == options.filter) + break; + + // Process the subscription. If the filter of the specified type does not + // exist yet, create it. + if (option_ == XS_SUBSCRIBE) { + if (it == filters.end ()) { + filter_t f; + f.type = get_filter (options.filter); + xs_assert (f.type); + f.instance = f.type->sf_create ((void*) (core_t*) this); + xs_assert (f.instance); + filters.push_back (f); + it = filters.end () - 1; + } + int rc = it->type->sf_subscribe ((void*) (core_t*) this, it->instance, + (const unsigned char*) optval_, optvallen_); + errno_assert (rc == 0); + } + else if (option_ == XS_UNSUBSCRIBE) { + xs_assert (it != filters.end ()); + int rc = it->type->sf_unsubscribe ((void*) (core_t*) this, it->instance, + (const unsigned char*) optval_, optvallen_); + errno_assert (rc == 0); + } + + // Create the subscription message. msg_t msg; int rc = msg.init_size (optvallen_ + 4); @@ -59,22 +96,9 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_, put_uint16 (data, XS_CMD_SUBSCRIBE); else if (option_ == XS_UNSUBSCRIBE) put_uint16 (data, XS_CMD_UNSUBSCRIBE); - put_uint16 (data + 2, options.filter_id); + put_uint16 (data + 2, options.filter); memcpy (data + 4, optval_, optvallen_); -#if 0 - // TODO: This is 0MQ/3.1 protocol. - msg_t msg; - int rc = msg.init_size (optvallen_ + 1); - errno_assert (rc == 0); - unsigned char *data = (unsigned char*) msg.data (); - if (option_ == XS_SUBSCRIBE) - data [0] = 1; - else if (option_ == XS_UNSUBSCRIBE) - data [0] = 0; - memcpy (data + 1, optval_, optvallen_); -#endif - // Pass it further on in the stack. int err = 0; rc = xsub_t::xsend (&msg, 0); @@ -94,12 +118,102 @@ int xs::sub_t::xsend (msg_t *msg_, int flags_) return -1; } +int xs::sub_t::xrecv (msg_t *msg_, int flags_) +{ + // If there's already a message prepared by a previous call to xs_poll, + // return it straight ahead. + if (has_message) { + int rc = msg_->move (message); + errno_assert (rc == 0); + has_message = false; + more = msg_->flags () & msg_t::more ? true : false; + return 0; + } + + // TODO: This can result in infinite loop in the case of continuous + // stream of non-matching messages which breaks the non-blocking recv + // semantics. + while (true) { + + // Get a message using fair queueing algorithm. + int rc = xsub_t::xrecv (msg_, flags_); + + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) + return -1; + + // Check whether the message matches at least one subscription. + // Non-initial parts of the message are passed automatically. + if (more || match (msg_)) { + more = msg_->flags () & msg_t::more ? true : false; + return 0; + } + + // Message doesn't match. Pop any remaining parts of the message + // from the pipe. + while (msg_->flags () & msg_t::more) { + rc = xsub_t::xrecv (msg_, XS_DONTWAIT); + xs_assert (rc == 0); + } + } +} + +bool xs::sub_t::xhas_in () +{ + // There are subsequent parts of the partly-read message available. + if (more) + return true; + + // If there's already a message prepared by a previous call to xs_poll, + // return straight ahead. + if (has_message) + return true; + + // TODO: This can result in infinite loop in the case of continuous + // stream of non-matching messages. + while (true) { + + // Get a message using fair queueing algorithm. + int rc = xsub_t::xrecv (&message, XS_DONTWAIT); + + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) { + xs_assert (errno == EAGAIN); + return false; + } + + // Check whether the message matches at least one subscription. + if (match (&message)) { + has_message = true; + return true; + } + + // Message doesn't match. Pop any remaining parts of the message + // from the pipe. + while (message.flags () & msg_t::more) { + rc = xsub_t::xrecv (&message, XS_DONTWAIT); + xs_assert (rc == 0); + } + } +} + bool xs::sub_t::xhas_out () { // Overload the XSUB's send. return false; } +bool xs::sub_t::match (msg_t *msg_) +{ + for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) + if (it->type->sf_match ((void*) (core_t*) this, it->instance, + (unsigned char*) msg_->data (), msg_->size ())) + return true; + return false; +} + xs::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : diff --git a/src/sub.hpp b/src/sub.hpp index 47b1877..4c3bf7f 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -22,6 +22,10 @@ #ifndef __XS_SUB_HPP_INCLUDED__ #define __XS_SUB_HPP_INCLUDED__ +#include + +#include "../include/xs.h" + #include "xsub.hpp" namespace xs @@ -43,10 +47,33 @@ namespace xs int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (xs::msg_t *msg_, int flags_); + int xrecv (xs::msg_t *msg_, int flags_); + bool xhas_in (); bool xhas_out (); + // The repository of subscriptions. + struct filter_t + { + xs_filter_t *type; + void *instance; + }; + typedef std::vector filters_t; + filters_t filters; + + // If true, part of a multipart message was already received, but + // there are following parts still waiting. + bool more; + + // If true, 'message' contains a matching message to return on the + // next recv call. + bool has_message; + msg_t message; + private: + // Check whether the message matches at least one subscription. + bool match (xs::msg_t *msg_); + sub_t (const sub_t&); const sub_t &operator = (const sub_t&); }; diff --git a/src/xsub.cpp b/src/xsub.cpp index da56586..3afd734 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -26,11 +26,7 @@ #include "wire.hpp" xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_), - has_message (false), - more (false), - tmp_pipe (NULL), - tmp_filter_id (-1) + socket_base_t (parent_, tid_, sid_) { options.type = XS_XSUB; @@ -40,19 +36,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : // Also, we want the subscription buffer to be elastic by default. options.sndhwm = 0; - - int rc = message.init (); - errno_assert (rc == 0); } xs::xsub_t::~xsub_t () { - // Deallocate all the filters. - for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) - it->type->sf_destroy ((void*) (core_t*) this, it->instance); - - int rc = message.close (); - errno_assert (rc == 0); } void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) @@ -62,19 +49,17 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // Pipes with 0MQ/2.1-style protocol are not eligible for accepting // subscriptions. - if (pipe_->get_protocol () != 1) - dist.attach (pipe_); + if (pipe_->get_protocol () == 1) + return; + + dist.attach (pipe_); // Send all the cached subscriptions to the new upstream peer. - tmp_pipe = pipe_; - for (filters_t::iterator it = filters.begin (); it != filters.end (); - ++it) { - tmp_filter_id = it->type->id (NULL); - it->type->sf_enumerate ((void*) (core_t*) this, it->instance); - tmp_filter_id = -1; - } + for (subscriptions_t::iterator its = subscriptions.begin (); + its != subscriptions.end (); ++its) + send_subscription (pipe_, true, its->first.first, + its->first.second.data (), its->first.second.size ()); pipe_->flush (); - tmp_pipe = NULL; } void xs::xsub_t::xread_activated (pipe_t *pipe_) @@ -96,18 +81,16 @@ void xs::xsub_t::xterminated (pipe_t *pipe_) void xs::xsub_t::xhiccuped (pipe_t *pipe_) { - // Send all the cached subscriptions to the hiccuped pipe. - if (pipe_->get_protocol () != 1) { - tmp_pipe = pipe_; - for (filters_t::iterator it = filters.begin (); it != filters.end (); - ++it) { - tmp_filter_id = it->type->id (NULL); - it->type->sf_enumerate ((void*) (core_t*) this, it->instance); - tmp_filter_id = -1; - } - pipe_->flush (); - tmp_pipe = NULL; - } + // In 0MQ/2.1 protocol there is no subscription forwarding. + if (pipe_->get_protocol () == 1) + return; + + // Send all the cached subscriptions to the new upstream peer. + for (subscriptions_t::iterator its = subscriptions.begin (); + its != subscriptions.end (); ++its) + send_subscription (pipe_, true, its->first.first, + its->first.second.data (), its->first.second.size ()); + pipe_->flush (); } int xs::xsub_t::xsend (msg_t *msg_, int flags_) @@ -121,66 +104,33 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) } int cmd = get_uint16 (data); int filter_id = get_uint16 (data + 2); - -#if 0 - // TODO: This is 0MQ/3.1 protocol. - if (size < 1) { - errno = EINVAL; - return -1; - } - int cmd = data [0] ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE; - int filter_id = XS_FILTER_PREFIX; -#endif - if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { errno = EINVAL; return -1; } - - // Find the relevant filter. - filters_t::iterator it; - for (it = filters.begin (); it != filters.end (); ++it) - if (it->type->id (NULL) == filter_id) - break; // Process the subscription. if (cmd == XS_CMD_SUBSCRIBE) { - - // If the filter of the specified type does not exist yet, create it. - if (it == filters.end ()) { - filter_t f; - f.type = get_filter (filter_id); - xs_assert (f.type); - f.instance = f.type->sf_create ((void*) (core_t*) this); - xs_assert (f.instance); - filters.push_back (f); - it = filters.end () - 1; - } - - if (it->type->sf_subscribe ((void*) (core_t*) this, - it->instance, data + 4, size - 4) == 1) -#if 0 - // TODO: This is 0MQ/3.1 protocol. - if (it->type->sf_subscribe ((void*) (core_t*) this, - it->instance, data + 1, size - 1) == 1) -#endif + subscriptions_t::iterator it = subscriptions.insert ( + std::make_pair (std::make_pair (filter_id, + blob_t (data + 4, size - 4)), 0)).first; + ++it->second; + if (it->second == 1) return dist.send_to_all (msg_, flags_); else return 0; } else if (cmd == XS_CMD_UNSUBSCRIBE) { - xs_assert (it != filters.end ()); - - if (it->type->sf_unsubscribe ((void*) (core_t*) this, - it->instance, data + 4, size - 4) == 1) -#if 0 - // TODO: This is 0MQ/3.1 protocol. - if (it->type->sf_unsubscribe ((void*) (core_t*) this, - it->instance, data + 1, size - 1) == 1) -#endif - return dist.send_to_all (msg_, flags_); - else + subscriptions_t::iterator it = subscriptions.find ( + std::make_pair (filter_id, blob_t (data + 4, size - 4))); + if (it == subscriptions.end ()) return 0; + xs_assert (it->second); + --it->second; + if (it->second) + return 0; + subscriptions.erase (it); + return dist.send_to_all (msg_, flags_); } xs_assert (false); @@ -195,118 +145,28 @@ bool xs::xsub_t::xhas_out () int xs::xsub_t::xrecv (msg_t *msg_, int flags_) { - // If there's already a message prepared by a previous call to xs_poll, - // return it straight ahead. - if (has_message) { - int rc = msg_->move (message); - errno_assert (rc == 0); - has_message = false; - more = msg_->flags () & msg_t::more ? true : false; - return 0; - } - - // TODO: This can result in infinite loop in the case of continuous - // stream of non-matching messages which breaks the non-blocking recv - // semantics. - while (true) { - - // Get a message using fair queueing algorithm. - int rc = fq.recv (msg_, flags_); - - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) - return -1; - - // Check whether the message matches at least one subscription. - // Non-initial parts of the message are passed - if (more || !options.filter || match (msg_)) { - more = msg_->flags () & msg_t::more ? true : false; - return 0; - } - - // Message doesn't match. Pop any remaining parts of the message - // from the pipe. - while (msg_->flags () & msg_t::more) { - rc = fq.recv (msg_, XS_DONTWAIT); - xs_assert (rc == 0); - } - } + return fq.recv (msg_, flags_); } bool xs::xsub_t::xhas_in () { - // There are subsequent parts of the partly-read message available. - if (more) - return true; - - // If there's already a message prepared by a previous call to xs_poll, - // return straight ahead. - if (has_message) - return true; - - // TODO: This can result in infinite loop in the case of continuous - // stream of non-matching messages. - while (true) { - - // Get a message using fair queueing algorithm. - int rc = fq.recv (&message, XS_DONTWAIT); - - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) { - xs_assert (errno == EAGAIN); - return false; - } - - // Check whether the message matches at least one subscription. - if (!options.filter || match (&message)) { - has_message = true; - return true; - } - - // Message doesn't match. Pop any remaining parts of the message - // from the pipe. - while (message.flags () & msg_t::more) { - rc = fq.recv (&message, XS_DONTWAIT); - xs_assert (rc == 0); - } - } + return fq.has_in (); } -bool xs::xsub_t::match (msg_t *msg_) -{ - for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) - if (it->type->sf_match ((void*) (core_t*) this, it->instance, - (unsigned char*) msg_->data (), msg_->size ())) - return true; - return false; -} - -int xs::xsub_t::filter_subscribed (const unsigned char *data_, size_t size_) +void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_, + int filter_id_, const unsigned char *data_, size_t size_) { // Create the subsctription message. msg_t msg; int rc = msg.init_size (size_ + 4); xs_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); - put_uint16 (data, XS_CMD_SUBSCRIBE); - put_uint16 (data + 2, tmp_filter_id); + put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE); + put_uint16 (data + 2, filter_id_); memcpy (data + 4, data_, size_); -#if 0 - // TODO: This is 0MQ/3.1 protocol. - xs_assert (tmp_filter_id == XS_FILTER_PREFIX); - msg_t msg; - int rc = msg.init_size (size_ + 1); - xs_assert (rc == 0); - unsigned char *data = (unsigned char*) msg.data (); - data [0] = 1; - memcpy (data + 1, data_, size_); -#endif - // Send it to the pipe. - bool sent = tmp_pipe->write (&msg); + bool sent = pipe_->write (&msg); // If we reached the SNDHWM, and thus cannot send the subscription, drop // the subscription message instead. This matches the behaviour of @@ -314,8 +174,6 @@ int xs::xsub_t::filter_subscribed (const unsigned char *data_, size_t size_) // when the SNDHWM is reached. if (!sent) msg.close (); - - return 0; } xs::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_, diff --git a/src/xsub.hpp b/src/xsub.hpp index 4621570..f8296d5 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -21,7 +21,7 @@ #ifndef __XS_XSUB_HPP_INCLUDED__ #define __XS_XSUB_HPP_INCLUDED__ -#include +#include #include "../include/xs.h" @@ -60,11 +60,8 @@ namespace xs private: - // Overloads from core_t class. - int filter_subscribed (const unsigned char *data_, size_t size_); - - // Check whether the message matches at least one subscription. - bool match (xs::msg_t *msg_); + void send_subscription (pipe_t *pipe_, bool subscribe_, int filter_id_, + const unsigned char *data_, size_t size_); // Fair queueing object for inbound pipes. fq_t fq; @@ -72,27 +69,9 @@ namespace xs // Object for distributing the subscriptions upstream. dist_t dist; - // The repository of subscriptions. - struct filter_t - { - xs_filter_t *type; - void *instance; - }; - typedef std::vector filters_t; - filters_t filters; - - // If true, 'message' contains a matching message to return on the - // next recv call. - bool has_message; - msg_t message; - - // If true, part of a multipart message was already received, but - // there are following parts still waiting. - bool more; - - // Different values stored while filter extensions are being executed. - pipe_t *tmp_pipe; - int tmp_filter_id; + // Cache of all subscriptions in place at the moment. + typedef std::map , int> subscriptions_t; + subscriptions_t subscriptions; xsub_t (const xsub_t&); const xsub_t &operator = (const xsub_t&); -- cgit v1.2.3