summaryrefslogtreecommitdiff
path: root/src/xsub.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-05 07:32:58 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-05 07:32:58 +0200
commitd82cbb3a81f116cd22e9895ecac36ac3d7b38929 (patch)
tree03c923311b937f550bec325d131476513a02bebf /src/xsub.cpp
parent52b8a917deb2990e7197b82e81e0258ebe30f424 (diff)
XS_PLUGIN and XS_FILTER implementation
This patch introduces following features: - XS_PLUGIN context option to add plugins to libxs - XS_FILTER option to switch between different filter types - Automatic loading of plug-ins is *not* implemented. From the implementation point of view: - standard prefix filter is implemented as a pluggable filter - trie_t and mtrie_t are joined into a single class - the code for 0MQ/3.1 compatibility is left in in the form of comments - new test for testing re-subscriptions is added Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xsub.cpp')
-rw-r--r--src/xsub.cpp119
1 files changed, 101 insertions, 18 deletions
diff --git a/src/xsub.cpp b/src/xsub.cpp
index af6789f..da56586 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -23,11 +23,14 @@
#include "xsub.hpp"
#include "err.hpp"
+#include "wire.hpp"
xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
has_message (false),
- more (false)
+ more (false),
+ tmp_pipe (NULL),
+ tmp_filter_id (-1)
{
options.type = XS_XSUB;
@@ -44,6 +47,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xs::xsub_t::~xsub_t ()
{
+ // Deallocate all the filters.
+ for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
+ it->type->sf_destroy ((void*) (core_t*) this, it->instance);
+
int rc = message.close ();
errno_assert (rc == 0);
}
@@ -59,8 +66,15 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
dist.attach (pipe_);
// Send all the cached subscriptions to the new upstream peer.
- subscriptions.apply (send_subscription, pipe_);
+ tmp_pipe = pipe_;
+ for (filters_t::iterator it = filters.begin (); it != filters.end ();
+ ++it) {
+ tmp_filter_id = it->type->id (NULL);
+ it->type->sf_enumerate ((void*) (core_t*) this, it->instance);
+ tmp_filter_id = -1;
+ }
pipe_->flush ();
+ tmp_pipe = NULL;
}
void xs::xsub_t::xread_activated (pipe_t *pipe_)
@@ -82,11 +96,17 @@ void xs::xsub_t::xterminated (pipe_t *pipe_)
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
+ // Send all the cached subscriptions to the hiccuped pipe.
if (pipe_->get_protocol () != 1) {
-
- // Send all the cached subscriptions to the hiccuped pipe.
- subscriptions.apply (send_subscription, pipe_);
+ tmp_pipe = pipe_;
+ for (filters_t::iterator it = filters.begin (); it != filters.end ();
+ ++it) {
+ tmp_filter_id = it->type->id (NULL);
+ it->type->sf_enumerate ((void*) (core_t*) this, it->instance);
+ tmp_filter_id = -1;
+ }
pipe_->flush ();
+ tmp_pipe = NULL;
}
}
@@ -95,21 +115,69 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
size_t size = msg_->size ();
unsigned char *data = (unsigned char*) msg_->data ();
- // Malformed subscriptions.
- if (size < 1 || (*data != 0 && *data != 1)) {
+ if (size < 4) {
+ errno = EINVAL;
+ return -1;
+ }
+ int cmd = get_uint16 (data);
+ int filter_id = get_uint16 (data + 2);
+
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ if (size < 1) {
+ errno = EINVAL;
+ return -1;
+ }
+ int cmd = data [0] ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE;
+ int filter_id = XS_FILTER_PREFIX;
+#endif
+
+ if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
errno = EINVAL;
return -1;
}
+
+ // Find the relevant filter.
+ filters_t::iterator it;
+ for (it = filters.begin (); it != filters.end (); ++it)
+ if (it->type->id (NULL) == filter_id)
+ break;
+
+ // Process the subscription.
+ if (cmd == XS_CMD_SUBSCRIBE) {
+
+ // If the filter of the specified type does not exist yet, create it.
+ if (it == filters.end ()) {
+ filter_t f;
+ f.type = get_filter (filter_id);
+ xs_assert (f.type);
+ f.instance = f.type->sf_create ((void*) (core_t*) this);
+ xs_assert (f.instance);
+ filters.push_back (f);
+ it = filters.end () - 1;
+ }
- // Process the subscription.
- if (*data == 1) {
- if (subscriptions.add (data + 1, size - 1))
+ if (it->type->sf_subscribe ((void*) (core_t*) this,
+ it->instance, data + 4, size - 4) == 1)
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ if (it->type->sf_subscribe ((void*) (core_t*) this,
+ it->instance, data + 1, size - 1) == 1)
+#endif
return dist.send_to_all (msg_, flags_);
else
return 0;
}
- else if (*data == 0) {
- if (subscriptions.rm (data + 1, size - 1))
+ else if (cmd == XS_CMD_UNSUBSCRIBE) {
+ xs_assert (it != filters.end ());
+
+ if (it->type->sf_unsubscribe ((void*) (core_t*) this,
+ it->instance, data + 4, size - 4) == 1)
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ if (it->type->sf_unsubscribe ((void*) (core_t*) this,
+ it->instance, data + 1, size - 1) == 1)
+#endif
return dist.send_to_all (msg_, flags_);
else
return 0;
@@ -208,24 +276,37 @@ bool xs::xsub_t::xhas_in ()
bool xs::xsub_t::match (msg_t *msg_)
{
- return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
+ for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
+ if (it->type->sf_match ((void*) (core_t*) this, it->instance,
+ (unsigned char*) msg_->data (), msg_->size ()))
+ return true;
+ return false;
}
-void xs::xsub_t::send_subscription (unsigned char *data_, size_t size_,
- void *arg_)
+int xs::xsub_t::filter_subscribed (const unsigned char *data_, size_t size_)
{
- pipe_t *pipe = (pipe_t*) arg_;
-
// Create the subsctription message.
msg_t msg;
+ int rc = msg.init_size (size_ + 4);
+ xs_assert (rc == 0);
+ unsigned char *data = (unsigned char*) msg.data ();
+ put_uint16 (data, XS_CMD_SUBSCRIBE);
+ put_uint16 (data + 2, tmp_filter_id);
+ memcpy (data + 4, data_, size_);
+
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ xs_assert (tmp_filter_id == XS_FILTER_PREFIX);
+ msg_t msg;
int rc = msg.init_size (size_ + 1);
xs_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
data [0] = 1;
memcpy (data + 1, data_, size_);
+#endif
// Send it to the pipe.
- bool sent = pipe->write (&msg);
+ bool sent = tmp_pipe->write (&msg);
// If we reached the SNDHWM, and thus cannot send the subscription, drop
// the subscription message instead. This matches the behaviour of
@@ -233,6 +314,8 @@ void xs::xsub_t::send_subscription (unsigned char *data_, size_t size_,
// when the SNDHWM is reached.
if (!sent)
msg.close ();
+
+ return 0;
}
xs::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_,