summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-14 14:33:59 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-15 06:56:54 +0200
commite86827511b35231679085dc236e9744184ed4609 (patch)
treec8aea816ed2c5ed6b1d383367c6e34576b2c3d7c /src
parent71cf5791422c7a461520fce8225342c27821e774 (diff)
Filters can transform user subscriptions to wire subscription
Till now the subscription, as specified by the user, was send upstream. This patch allows SUB-side filter to transform the user subscription into wire subscription. For example, only a has can be sent upstream instead of a perfect subscription. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/prefix_filter.cpp6
-rw-r--r--src/sub.cpp73
-rw-r--r--src/sub.hpp12
-rw-r--r--src/xsub.hpp3
4 files changed, 63 insertions, 31 deletions
diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp
index 0514763..1a9b8ac 100644
--- a/src/prefix_filter.cpp
+++ b/src/prefix_filter.cpp
@@ -525,14 +525,16 @@ static void sf_destroy (void *core_, void *sf_)
static int sf_subscribe (void *core_, void *sf_,
const unsigned char *data_, size_t size_)
{
- pfx_add ((pfx_node_t*) sf_, data_, size_, NULL);
+ if (pfx_add ((pfx_node_t*) sf_, data_, size_, NULL))
+ return xs_filter_subscribed (core_, data_, size_);
return 0;
}
static int sf_unsubscribe (void *core_, void *sf_,
const unsigned char *data_, size_t size_)
{
- pfx_rm ((pfx_node_t*) sf_, data_, size_, NULL);
+ if (pfx_rm ((pfx_node_t*) sf_, data_, size_, NULL))
+ return xs_filter_unsubscribed (core_, data_, size_);
return 0;
}
diff --git a/src/sub.cpp b/src/sub.cpp
index 2aa3901..d29ae8d 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -78,37 +78,18 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,
int rc = it->type->sf_subscribe ((void*) (core_t*) this, it->instance,
(const unsigned char*) optval_, optvallen_);
errno_assert (rc == 0);
+ return 0;
}
else if (option_ == XS_UNSUBSCRIBE) {
xs_assert (it != filters.end ());
int rc = it->type->sf_unsubscribe ((void*) (core_t*) this, it->instance,
(const unsigned char*) optval_, optvallen_);
errno_assert (rc == 0);
+ return 0;
}
-
- // Create the subscription message.
- msg_t msg;
- int rc = msg.init_size (optvallen_ + 4);
- errno_assert (rc == 0);
- unsigned char *data = (unsigned char*) msg.data ();
- if (option_ == XS_SUBSCRIBE)
- put_uint16 (data, XS_CMD_SUBSCRIBE);
- else if (option_ == XS_UNSUBSCRIBE)
- put_uint16 (data, XS_CMD_UNSUBSCRIBE);
- put_uint16 (data + 2, options.filter);
- memcpy (data + 4, optval_, optvallen_);
-
- // Pass it further on in the stack.
- int err = 0;
- rc = xsub_t::xsend (&msg, 0);
- if (rc != 0)
- err = errno;
- int rc2 = msg.close ();
- errno_assert (rc2 == 0);
- if (rc != 0)
- errno = err;
- return rc;
+ xs_assert (false);
+ return -1;
}
int xs::sub_t::xsend (msg_t *msg_, int flags_)
@@ -214,6 +195,52 @@ bool xs::sub_t::match (msg_t *msg_)
return false;
}
+int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_)
+{
+ // Create the subscription message.
+ msg_t msg;
+ int rc = msg.init_size (size_ + 4);
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) msg.data ();
+ put_uint16 (data, XS_CMD_SUBSCRIBE);
+ put_uint16 (data + 2, options.filter);
+ memcpy (data + 4, data_, size_);
+
+ // Pass it further on in the stack.
+ int err = 0;
+ rc = xsub_t::xsend (&msg, 0);
+ if (rc != 0)
+ err = errno;
+ int rc2 = msg.close ();
+ errno_assert (rc2 == 0);
+ if (rc != 0)
+ errno = err;
+ return rc;
+}
+
+int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
+{
+ // Create the unsubscription message.
+ msg_t msg;
+ int rc = msg.init_size (size_ + 4);
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) msg.data ();
+ put_uint16 (data, XS_CMD_UNSUBSCRIBE);
+ put_uint16 (data + 2, options.filter);
+ memcpy (data + 4, data_, size_);
+
+ // Pass it further on in the stack.
+ int err = 0;
+ rc = xsub_t::xsend (&msg, 0);
+ if (rc != 0)
+ err = errno;
+ int rc2 = msg.close ();
+ errno_assert (rc2 == 0);
+ if (rc != 0)
+ errno = err;
+ return rc;
+}
+
xs::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
diff --git a/src/sub.hpp b/src/sub.hpp
index 4c3bf7f..c50c0bd 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -27,6 +27,7 @@
#include "../include/xs.h"
#include "xsub.hpp"
+#include "core.hpp"
namespace xs
{
@@ -36,21 +37,26 @@ namespace xs
class io_thread_t;
class socket_base_t;
- class sub_t : public xsub_t
+ class sub_t : public xsub_t, public core_t
{
public:
sub_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
~sub_t ();
- protected:
+ private:
+ // Overloaded functions from socket_base_t.
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
+ // Overloaded functions from core_t.
+ int filter_subscribed (const unsigned char *data_, size_t size_);
+ int filter_unsubscribed (const unsigned char *data_, size_t size_);
+
// The repository of subscriptions.
struct filter_t
{
@@ -69,8 +75,6 @@ namespace xs
bool has_message;
msg_t message;
- private:
-
// Check whether the message matches at least one subscription.
bool match (xs::msg_t *msg_);
diff --git a/src/xsub.hpp b/src/xsub.hpp
index f8296d5..5bc11c4 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -28,7 +28,6 @@
#include "socket_base.hpp"
#include "session_base.hpp"
#include "dist.hpp"
-#include "core.hpp"
#include "fq.hpp"
namespace xs
@@ -38,7 +37,7 @@ namespace xs
class pipe_t;
class io_thread_t;
- class xsub_t : public socket_base_t, public core_t
+ class xsub_t : public socket_base_t
{
public: