From e86827511b35231679085dc236e9744184ed4609 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 14 Apr 2012 14:33:59 +0200 Subject: Filters can transform user subscriptions to wire subscription Till now the subscription, as specified by the user, was send upstream. This patch allows SUB-side filter to transform the user subscription into wire subscription. For example, only a has can be sent upstream instead of a perfect subscription. Signed-off-by: Martin Sustrik --- src/prefix_filter.cpp | 6 +++-- src/sub.cpp | 73 +++++++++++++++++++++++++++++++++++---------------- src/sub.hpp | 12 ++++++--- src/xsub.hpp | 3 +-- 4 files changed, 63 insertions(+), 31 deletions(-) diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp index 0514763..1a9b8ac 100644 --- a/src/prefix_filter.cpp +++ b/src/prefix_filter.cpp @@ -525,14 +525,16 @@ static void sf_destroy (void *core_, void *sf_) static int sf_subscribe (void *core_, void *sf_, const unsigned char *data_, size_t size_) { - pfx_add ((pfx_node_t*) sf_, data_, size_, NULL); + if (pfx_add ((pfx_node_t*) sf_, data_, size_, NULL)) + return xs_filter_subscribed (core_, data_, size_); return 0; } static int sf_unsubscribe (void *core_, void *sf_, const unsigned char *data_, size_t size_) { - pfx_rm ((pfx_node_t*) sf_, data_, size_, NULL); + if (pfx_rm ((pfx_node_t*) sf_, data_, size_, NULL)) + return xs_filter_unsubscribed (core_, data_, size_); return 0; } diff --git a/src/sub.cpp b/src/sub.cpp index 2aa3901..d29ae8d 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -78,37 +78,18 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_, int rc = it->type->sf_subscribe ((void*) (core_t*) this, it->instance, (const unsigned char*) optval_, optvallen_); errno_assert (rc == 0); + return 0; } else if (option_ == XS_UNSUBSCRIBE) { xs_assert (it != filters.end ()); int rc = it->type->sf_unsubscribe ((void*) (core_t*) this, it->instance, (const unsigned char*) optval_, optvallen_); errno_assert (rc == 0); + return 0; } - - // Create the subscription message. - msg_t msg; - int rc = msg.init_size (optvallen_ + 4); - errno_assert (rc == 0); - unsigned char *data = (unsigned char*) msg.data (); - if (option_ == XS_SUBSCRIBE) - put_uint16 (data, XS_CMD_SUBSCRIBE); - else if (option_ == XS_UNSUBSCRIBE) - put_uint16 (data, XS_CMD_UNSUBSCRIBE); - put_uint16 (data + 2, options.filter); - memcpy (data + 4, optval_, optvallen_); - - // Pass it further on in the stack. - int err = 0; - rc = xsub_t::xsend (&msg, 0); - if (rc != 0) - err = errno; - int rc2 = msg.close (); - errno_assert (rc2 == 0); - if (rc != 0) - errno = err; - return rc; + xs_assert (false); + return -1; } int xs::sub_t::xsend (msg_t *msg_, int flags_) @@ -214,6 +195,52 @@ bool xs::sub_t::match (msg_t *msg_) return false; } +int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_) +{ + // Create the subscription message. + msg_t msg; + int rc = msg.init_size (size_ + 4); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) msg.data (); + put_uint16 (data, XS_CMD_SUBSCRIBE); + put_uint16 (data + 2, options.filter); + memcpy (data + 4, data_, size_); + + // Pass it further on in the stack. + int err = 0; + rc = xsub_t::xsend (&msg, 0); + if (rc != 0) + err = errno; + int rc2 = msg.close (); + errno_assert (rc2 == 0); + if (rc != 0) + errno = err; + return rc; +} + +int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_) +{ + // Create the unsubscription message. + msg_t msg; + int rc = msg.init_size (size_ + 4); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) msg.data (); + put_uint16 (data, XS_CMD_UNSUBSCRIBE); + put_uint16 (data + 2, options.filter); + memcpy (data + 4, data_, size_); + + // Pass it further on in the stack. + int err = 0; + rc = xsub_t::xsend (&msg, 0); + if (rc != 0) + err = errno; + int rc2 = msg.close (); + errno_assert (rc2 == 0); + if (rc != 0) + errno = err; + return rc; +} + xs::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : diff --git a/src/sub.hpp b/src/sub.hpp index 4c3bf7f..c50c0bd 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -27,6 +27,7 @@ #include "../include/xs.h" #include "xsub.hpp" +#include "core.hpp" namespace xs { @@ -36,21 +37,26 @@ namespace xs class io_thread_t; class socket_base_t; - class sub_t : public xsub_t + class sub_t : public xsub_t, public core_t { public: sub_t (xs::ctx_t *parent_, uint32_t tid_, int sid_); ~sub_t (); - protected: + private: + // Overloaded functions from socket_base_t. int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (xs::msg_t *msg_, int flags_); int xrecv (xs::msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); + // Overloaded functions from core_t. + int filter_subscribed (const unsigned char *data_, size_t size_); + int filter_unsubscribed (const unsigned char *data_, size_t size_); + // The repository of subscriptions. struct filter_t { @@ -69,8 +75,6 @@ namespace xs bool has_message; msg_t message; - private: - // Check whether the message matches at least one subscription. bool match (xs::msg_t *msg_); diff --git a/src/xsub.hpp b/src/xsub.hpp index f8296d5..5bc11c4 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -28,7 +28,6 @@ #include "socket_base.hpp" #include "session_base.hpp" #include "dist.hpp" -#include "core.hpp" #include "fq.hpp" namespace xs @@ -38,7 +37,7 @@ namespace xs class pipe_t; class io_thread_t; - class xsub_t : public socket_base_t, public core_t + class xsub_t : public socket_base_t { public: -- cgit v1.2.3