From e86827511b35231679085dc236e9744184ed4609 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 14 Apr 2012 14:33:59 +0200 Subject: Filters can transform user subscriptions to wire subscription Till now the subscription, as specified by the user, was send upstream. This patch allows SUB-side filter to transform the user subscription into wire subscription. For example, only a has can be sent upstream instead of a perfect subscription. Signed-off-by: Martin Sustrik --- src/sub.cpp | 73 ++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 23 deletions(-) (limited to 'src/sub.cpp') diff --git a/src/sub.cpp b/src/sub.cpp index 2aa3901..d29ae8d 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -78,37 +78,18 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_, int rc = it->type->sf_subscribe ((void*) (core_t*) this, it->instance, (const unsigned char*) optval_, optvallen_); errno_assert (rc == 0); + return 0; } else if (option_ == XS_UNSUBSCRIBE) { xs_assert (it != filters.end ()); int rc = it->type->sf_unsubscribe ((void*) (core_t*) this, it->instance, (const unsigned char*) optval_, optvallen_); errno_assert (rc == 0); + return 0; } - - // Create the subscription message. - msg_t msg; - int rc = msg.init_size (optvallen_ + 4); - errno_assert (rc == 0); - unsigned char *data = (unsigned char*) msg.data (); - if (option_ == XS_SUBSCRIBE) - put_uint16 (data, XS_CMD_SUBSCRIBE); - else if (option_ == XS_UNSUBSCRIBE) - put_uint16 (data, XS_CMD_UNSUBSCRIBE); - put_uint16 (data + 2, options.filter); - memcpy (data + 4, optval_, optvallen_); - - // Pass it further on in the stack. - int err = 0; - rc = xsub_t::xsend (&msg, 0); - if (rc != 0) - err = errno; - int rc2 = msg.close (); - errno_assert (rc2 == 0); - if (rc != 0) - errno = err; - return rc; + xs_assert (false); + return -1; } int xs::sub_t::xsend (msg_t *msg_, int flags_) @@ -214,6 +195,52 @@ bool xs::sub_t::match (msg_t *msg_) return false; } +int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_) +{ + // Create the subscription message. + msg_t msg; + int rc = msg.init_size (size_ + 4); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) msg.data (); + put_uint16 (data, XS_CMD_SUBSCRIBE); + put_uint16 (data + 2, options.filter); + memcpy (data + 4, data_, size_); + + // Pass it further on in the stack. + int err = 0; + rc = xsub_t::xsend (&msg, 0); + if (rc != 0) + err = errno; + int rc2 = msg.close (); + errno_assert (rc2 == 0); + if (rc != 0) + errno = err; + return rc; +} + +int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_) +{ + // Create the unsubscription message. + msg_t msg; + int rc = msg.init_size (size_ + 4); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) msg.data (); + put_uint16 (data, XS_CMD_UNSUBSCRIBE); + put_uint16 (data + 2, options.filter); + memcpy (data + 4, data_, size_); + + // Pass it further on in the stack. + int err = 0; + rc = xsub_t::xsend (&msg, 0); + if (rc != 0) + err = errno; + int rc2 = msg.close (); + errno_assert (rc2 == 0); + if (rc != 0) + errno = err; + return rc; +} + xs::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : -- cgit v1.2.3