From d82cbb3a81f116cd22e9895ecac36ac3d7b38929 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 5 Apr 2012 07:32:58 +0200 Subject: XS_PLUGIN and XS_FILTER implementation This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik --- src/xsub.cpp | 119 ++++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 101 insertions(+), 18 deletions(-) (limited to 'src/xsub.cpp') diff --git a/src/xsub.cpp b/src/xsub.cpp index af6789f..da56586 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -23,11 +23,14 @@ #include "xsub.hpp" #include "err.hpp" +#include "wire.hpp" xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), has_message (false), - more (false) + more (false), + tmp_pipe (NULL), + tmp_filter_id (-1) { options.type = XS_XSUB; @@ -44,6 +47,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : xs::xsub_t::~xsub_t () { + // Deallocate all the filters. + for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) + it->type->sf_destroy ((void*) (core_t*) this, it->instance); + int rc = message.close (); errno_assert (rc == 0); } @@ -59,8 +66,15 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) dist.attach (pipe_); // Send all the cached subscriptions to the new upstream peer. - subscriptions.apply (send_subscription, pipe_); + tmp_pipe = pipe_; + for (filters_t::iterator it = filters.begin (); it != filters.end (); + ++it) { + tmp_filter_id = it->type->id (NULL); + it->type->sf_enumerate ((void*) (core_t*) this, it->instance); + tmp_filter_id = -1; + } pipe_->flush (); + tmp_pipe = NULL; } void xs::xsub_t::xread_activated (pipe_t *pipe_) @@ -82,11 +96,17 @@ void xs::xsub_t::xterminated (pipe_t *pipe_) void xs::xsub_t::xhiccuped (pipe_t *pipe_) { + // Send all the cached subscriptions to the hiccuped pipe. if (pipe_->get_protocol () != 1) { - - // Send all the cached subscriptions to the hiccuped pipe. - subscriptions.apply (send_subscription, pipe_); + tmp_pipe = pipe_; + for (filters_t::iterator it = filters.begin (); it != filters.end (); + ++it) { + tmp_filter_id = it->type->id (NULL); + it->type->sf_enumerate ((void*) (core_t*) this, it->instance); + tmp_filter_id = -1; + } pipe_->flush (); + tmp_pipe = NULL; } } @@ -95,21 +115,69 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) size_t size = msg_->size (); unsigned char *data = (unsigned char*) msg_->data (); - // Malformed subscriptions. - if (size < 1 || (*data != 0 && *data != 1)) { + if (size < 4) { + errno = EINVAL; + return -1; + } + int cmd = get_uint16 (data); + int filter_id = get_uint16 (data + 2); + +#if 0 + // TODO: This is 0MQ/3.1 protocol. + if (size < 1) { + errno = EINVAL; + return -1; + } + int cmd = data [0] ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE; + int filter_id = XS_FILTER_PREFIX; +#endif + + if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { errno = EINVAL; return -1; } + + // Find the relevant filter. + filters_t::iterator it; + for (it = filters.begin (); it != filters.end (); ++it) + if (it->type->id (NULL) == filter_id) + break; + + // Process the subscription. + if (cmd == XS_CMD_SUBSCRIBE) { + + // If the filter of the specified type does not exist yet, create it. + if (it == filters.end ()) { + filter_t f; + f.type = get_filter (filter_id); + xs_assert (f.type); + f.instance = f.type->sf_create ((void*) (core_t*) this); + xs_assert (f.instance); + filters.push_back (f); + it = filters.end () - 1; + } - // Process the subscription. - if (*data == 1) { - if (subscriptions.add (data + 1, size - 1)) + if (it->type->sf_subscribe ((void*) (core_t*) this, + it->instance, data + 4, size - 4) == 1) +#if 0 + // TODO: This is 0MQ/3.1 protocol. + if (it->type->sf_subscribe ((void*) (core_t*) this, + it->instance, data + 1, size - 1) == 1) +#endif return dist.send_to_all (msg_, flags_); else return 0; } - else if (*data == 0) { - if (subscriptions.rm (data + 1, size - 1)) + else if (cmd == XS_CMD_UNSUBSCRIBE) { + xs_assert (it != filters.end ()); + + if (it->type->sf_unsubscribe ((void*) (core_t*) this, + it->instance, data + 4, size - 4) == 1) +#if 0 + // TODO: This is 0MQ/3.1 protocol. + if (it->type->sf_unsubscribe ((void*) (core_t*) this, + it->instance, data + 1, size - 1) == 1) +#endif return dist.send_to_all (msg_, flags_); else return 0; @@ -208,24 +276,37 @@ bool xs::xsub_t::xhas_in () bool xs::xsub_t::match (msg_t *msg_) { - return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); + for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) + if (it->type->sf_match ((void*) (core_t*) this, it->instance, + (unsigned char*) msg_->data (), msg_->size ())) + return true; + return false; } -void xs::xsub_t::send_subscription (unsigned char *data_, size_t size_, - void *arg_) +int xs::xsub_t::filter_subscribed (const unsigned char *data_, size_t size_) { - pipe_t *pipe = (pipe_t*) arg_; - // Create the subsctription message. msg_t msg; + int rc = msg.init_size (size_ + 4); + xs_assert (rc == 0); + unsigned char *data = (unsigned char*) msg.data (); + put_uint16 (data, XS_CMD_SUBSCRIBE); + put_uint16 (data + 2, tmp_filter_id); + memcpy (data + 4, data_, size_); + +#if 0 + // TODO: This is 0MQ/3.1 protocol. + xs_assert (tmp_filter_id == XS_FILTER_PREFIX); + msg_t msg; int rc = msg.init_size (size_ + 1); xs_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); data [0] = 1; memcpy (data + 1, data_, size_); +#endif // Send it to the pipe. - bool sent = pipe->write (&msg); + bool sent = tmp_pipe->write (&msg); // If we reached the SNDHWM, and thus cannot send the subscription, drop // the subscription message instead. This matches the behaviour of @@ -233,6 +314,8 @@ void xs::xsub_t::send_subscription (unsigned char *data_, size_t size_, // when the SNDHWM is reached. if (!sent) msg.close (); + + return 0; } xs::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_, -- cgit v1.2.3