From 19894e0a1b6fbbcb62028fc6513ef3904a6f5c76 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 13 Apr 2012 09:34:13 +0200 Subject: Separate subscription forwarding from SUB-side filtering - subscription forwarding is handled by XSUB socket - filtering is handled by SUB sockets - subscriptions are decoupled from filter engines - filter doesn't have to be able to enumarate the subscriptions (no sf_enumerate function) Signed-off-by: Martin Sustrik --- src/sub.cpp | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 132 insertions(+), 18 deletions(-) (limited to 'src/sub.cpp') diff --git a/src/sub.cpp b/src/sub.cpp index 442dd9b..2aa3901 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -24,17 +24,24 @@ #include "wire.hpp" xs::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - xsub_t (parent_, tid_, sid_) + xsub_t (parent_, tid_, sid_), + more (false), + has_message (false) { options.type = XS_SUB; - // Switch filtering messages on (as opposed to XSUB which where the - // filtering is off). - options.filter = true; + int rc = message.init (); + errno_assert (rc == 0); } xs::sub_t::~sub_t () { + // Deallocate all the filters. + for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) + it->type->sf_destroy ((void*) (core_t*) this, it->instance); + + int rc = message.close (); + errno_assert (rc == 0); } int xs::sub_t::xsetsockopt (int option_, const void *optval_, @@ -50,6 +57,36 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_, return -1; } + // Find the relevant filter. + filters_t::iterator it; + for (it = filters.begin (); it != filters.end (); ++it) + if (it->type->id (NULL) == options.filter) + break; + + // Process the subscription. If the filter of the specified type does not + // exist yet, create it. + if (option_ == XS_SUBSCRIBE) { + if (it == filters.end ()) { + filter_t f; + f.type = get_filter (options.filter); + xs_assert (f.type); + f.instance = f.type->sf_create ((void*) (core_t*) this); + xs_assert (f.instance); + filters.push_back (f); + it = filters.end () - 1; + } + int rc = it->type->sf_subscribe ((void*) (core_t*) this, it->instance, + (const unsigned char*) optval_, optvallen_); + errno_assert (rc == 0); + } + else if (option_ == XS_UNSUBSCRIBE) { + xs_assert (it != filters.end ()); + int rc = it->type->sf_unsubscribe ((void*) (core_t*) this, it->instance, + (const unsigned char*) optval_, optvallen_); + errno_assert (rc == 0); + } + + // Create the subscription message. msg_t msg; int rc = msg.init_size (optvallen_ + 4); @@ -59,22 +96,9 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_, put_uint16 (data, XS_CMD_SUBSCRIBE); else if (option_ == XS_UNSUBSCRIBE) put_uint16 (data, XS_CMD_UNSUBSCRIBE); - put_uint16 (data + 2, options.filter_id); + put_uint16 (data + 2, options.filter); memcpy (data + 4, optval_, optvallen_); -#if 0 - // TODO: This is 0MQ/3.1 protocol. - msg_t msg; - int rc = msg.init_size (optvallen_ + 1); - errno_assert (rc == 0); - unsigned char *data = (unsigned char*) msg.data (); - if (option_ == XS_SUBSCRIBE) - data [0] = 1; - else if (option_ == XS_UNSUBSCRIBE) - data [0] = 0; - memcpy (data + 1, optval_, optvallen_); -#endif - // Pass it further on in the stack. int err = 0; rc = xsub_t::xsend (&msg, 0); @@ -94,12 +118,102 @@ int xs::sub_t::xsend (msg_t *msg_, int flags_) return -1; } +int xs::sub_t::xrecv (msg_t *msg_, int flags_) +{ + // If there's already a message prepared by a previous call to xs_poll, + // return it straight ahead. + if (has_message) { + int rc = msg_->move (message); + errno_assert (rc == 0); + has_message = false; + more = msg_->flags () & msg_t::more ? true : false; + return 0; + } + + // TODO: This can result in infinite loop in the case of continuous + // stream of non-matching messages which breaks the non-blocking recv + // semantics. + while (true) { + + // Get a message using fair queueing algorithm. + int rc = xsub_t::xrecv (msg_, flags_); + + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) + return -1; + + // Check whether the message matches at least one subscription. + // Non-initial parts of the message are passed automatically. + if (more || match (msg_)) { + more = msg_->flags () & msg_t::more ? true : false; + return 0; + } + + // Message doesn't match. Pop any remaining parts of the message + // from the pipe. + while (msg_->flags () & msg_t::more) { + rc = xsub_t::xrecv (msg_, XS_DONTWAIT); + xs_assert (rc == 0); + } + } +} + +bool xs::sub_t::xhas_in () +{ + // There are subsequent parts of the partly-read message available. + if (more) + return true; + + // If there's already a message prepared by a previous call to xs_poll, + // return straight ahead. + if (has_message) + return true; + + // TODO: This can result in infinite loop in the case of continuous + // stream of non-matching messages. + while (true) { + + // Get a message using fair queueing algorithm. + int rc = xsub_t::xrecv (&message, XS_DONTWAIT); + + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) { + xs_assert (errno == EAGAIN); + return false; + } + + // Check whether the message matches at least one subscription. + if (match (&message)) { + has_message = true; + return true; + } + + // Message doesn't match. Pop any remaining parts of the message + // from the pipe. + while (message.flags () & msg_t::more) { + rc = xsub_t::xrecv (&message, XS_DONTWAIT); + xs_assert (rc == 0); + } + } +} + bool xs::sub_t::xhas_out () { // Overload the XSUB's send. return false; } +bool xs::sub_t::match (msg_t *msg_) +{ + for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) + if (it->type->sf_match ((void*) (core_t*) this, it->instance, + (unsigned char*) msg_->data (), msg_->size ())) + return true; + return false; +} + xs::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : -- cgit v1.2.3