summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am8
-rw-r--r--src/core.cpp68
-rw-r--r--src/core.hpp53
-rw-r--r--src/ctx.cpp39
-rw-r--r--src/ctx.hpp14
-rw-r--r--src/mtrie.cpp436
-rw-r--r--src/mtrie.hpp93
-rw-r--r--src/object.cpp5
-rw-r--r--src/object.hpp5
-rw-r--r--src/options.cpp20
-rw-r--r--src/options.hpp3
-rw-r--r--src/prefix_filter.cpp664
-rw-r--r--src/prefix_filter.hpp108
-rw-r--r--src/sub.cpp19
-rw-r--r--src/trie.cpp338
-rw-r--r--src/trie.hpp79
-rw-r--r--src/wire.hpp4
-rw-r--r--src/xpub.cpp153
-rw-r--r--src/xpub.hpp30
-rw-r--r--src/xsub.cpp119
-rw-r--r--src/xsub.hpp29
21 files changed, 1267 insertions, 1020 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index fdfa8d1..3c260be 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -23,6 +23,7 @@ libxs_la_SOURCES = \
clock.hpp \
command.hpp \
config.hpp \
+ core.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
@@ -44,7 +45,6 @@ libxs_la_SOURCES = \
likely.hpp \
mailbox.hpp \
msg.hpp \
- mtrie.hpp \
mutex.hpp \
object.hpp \
options.hpp \
@@ -56,6 +56,7 @@ libxs_la_SOURCES = \
platform.hpp \
poll.hpp \
pair.hpp \
+ prefix_filter.hpp \
pub.hpp \
pull.hpp \
push.hpp \
@@ -74,7 +75,6 @@ libxs_la_SOURCES = \
tcp_connecter.hpp \
tcp_listener.hpp \
thread.hpp \
- trie.hpp \
upoll.hpp \
windows.hpp \
wire.hpp \
@@ -85,6 +85,7 @@ libxs_la_SOURCES = \
ypipe.hpp \
yqueue.hpp \
clock.cpp \
+ core.cpp \
ctx.cpp \
decoder.cpp \
devpoll.cpp \
@@ -103,7 +104,6 @@ libxs_la_SOURCES = \
lb.cpp \
mailbox.cpp \
msg.cpp \
- mtrie.cpp \
object.cpp \
options.cpp \
own.cpp \
@@ -113,6 +113,7 @@ libxs_la_SOURCES = \
pgm_socket.cpp \
pipe.cpp \
poll.cpp \
+ prefix_filter.cpp \
pull.cpp \
push.cpp \
reaper.cpp \
@@ -130,7 +131,6 @@ libxs_la_SOURCES = \
tcp_connecter.cpp \
tcp_listener.cpp \
thread.cpp \
- trie.cpp \
upoll.cpp \
xpub.cpp \
xrep.cpp \
diff --git a/src/core.cpp b/src/core.cpp
new file mode 100644
index 0000000..2d16167
--- /dev/null
+++ b/src/core.cpp
@@ -0,0 +1,68 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../include/xs.h"
+
+#include "core.hpp"
+#include "err.hpp"
+
+xs::core_t::core_t ()
+{
+}
+
+xs::core_t::~core_t ()
+{
+}
+
+int xs::core_t::filter_subscribed (const unsigned char *data_, size_t size_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int xs::core_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int xs::core_t::filter_matching (void *subscriber_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int xs_filter_subscribed (void *core_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::core_t*) core_)->filter_subscribed (data_, size_);
+}
+
+int xs_filter_unsubscribed (void *core_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::core_t*) core_)->filter_unsubscribed (data_, size_);
+}
+
+int xs_filter_matching (void *core_, void *subscriber_)
+{
+ return ((xs::core_t*) core_)->filter_matching (subscriber_);
+}
+
diff --git a/src/core.hpp b/src/core.hpp
new file mode 100644
index 0000000..fb50646
--- /dev/null
+++ b/src/core.hpp
@@ -0,0 +1,53 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_CORE_HPP_INCLUDED__
+#define __XS_CORE_HPP_INCLUDED__
+
+#include <stddef.h>
+
+namespace xs
+{
+
+ // This class is not a core of Crossroads. It's rather a callback interface
+ // for extensions, ie. what's extensions see as Crossroads core.
+
+ class core_t
+ {
+ public:
+
+ core_t ();
+ virtual ~core_t ();
+
+ virtual int filter_subscribed (const unsigned char *data_,
+ size_t size_);
+ virtual int filter_unsubscribed (const unsigned char *data_,
+ size_t size_);
+ virtual int filter_matching (void *subscriber_);
+
+ private:
+
+ core_t (const core_t&);
+ const core_t &operator = (const core_t&);
+ };
+
+}
+
+#endif
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 21dec58..46fa984 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -35,6 +35,7 @@
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
+#include "prefix_filter.hpp"
xs::ctx_t::ctx_t () :
tag (0xbadcafe0),
@@ -46,6 +47,9 @@ xs::ctx_t::ctx_t () :
max_sockets (512),
io_thread_count (1)
{
+ // Plug in the standard plugins.
+ int rc = plug (prefix_filter);
+ xs_assert (rc == 0);
}
bool xs::ctx_t::check_tag ()
@@ -124,6 +128,28 @@ int xs::ctx_t::terminate ()
return 0;
}
+int xs::ctx_t::plug (const void *ext_)
+{
+ if (!ext_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ // The extension is a message filter plug-in.
+ xs_filter_t *filter = (xs_filter_t*) ext_;
+ if (filter->type == XS_PLUGIN_FILTER && filter->version == 1) {
+ opt_sync.lock ();
+ filters [filter->id (NULL)] = filter;
+ opt_sync.unlock ();
+ return 0;
+ }
+
+ // Specified extension type is not supported by this version of
+ // the library.
+ errno = ENOTSUP;
+ return -1;
+}
+
int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
{
switch (option_) {
@@ -145,6 +171,8 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
io_thread_count = *((int*) optval_);
opt_sync.unlock ();
break;
+ case XS_PLUGIN:
+ return plug (optval_);
default:
errno = EINVAL;
return -1;
@@ -256,6 +284,17 @@ xs::object_t *xs::ctx_t::get_reaper ()
return reaper;
}
+xs_filter_t *xs::ctx_t::get_filter (int filter_id_)
+{
+ xs_filter_t *result = NULL;
+ opt_sync.lock ();
+ filters_t::iterator it = filters.find (filter_id_);
+ if (it != filters.end ())
+ result = it->second;
+ opt_sync.unlock ();
+ return result;
+}
+
void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
slots [tid_]->send (command_);
diff --git a/src/ctx.hpp b/src/ctx.hpp
index c4fa96a..54144ad 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -25,7 +25,8 @@
#include <map>
#include <vector>
#include <string>
-#include <stdarg.h>
+
+#include "../include/xs.h"
#include "mailbox.hpp"
#include "array.hpp"
@@ -88,6 +89,10 @@ namespace xs
// Returns reaper thread object.
xs::object_t *get_reaper ();
+ // Get the filter associated with the specified filter ID or NULL
+ // If such filter is not registered.
+ xs_filter_t *get_filter (int filter_id_);
+
// Management of inproc endpoints.
int register_endpoint (const char *addr_, endpoint_t &endpoint_);
void unregister_endpoints (xs::socket_base_t *socket_);
@@ -102,6 +107,9 @@ namespace xs
private:
+ // Plug in the extension specified.
+ int plug (const void *ext_);
+
// Used to check whether the object is a context.
uint32_t tag;
@@ -161,6 +169,10 @@ namespace xs
// Synchronisation of access to context options.
mutex_t opt_sync;
+ // List of all filters plugged into the context.
+ typedef std::map <int, xs_filter_t*> filters_t;
+ filters_t filters;
+
ctx_t (const ctx_t&);
const ctx_t &operator = (const ctx_t&);
};
diff --git a/src/mtrie.cpp b/src/mtrie.cpp
deleted file mode 100644
index eae34c2..0000000
--- a/src/mtrie.cpp
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- Copyright (c) 2011-2012 250bpm s.r.o.
- Copyright (c) 2011-2012 Spotify AB
- Copyright (c) 2011 Other contributors as noted in the AUTHORS file
-
- This file is part of Crossroads I/O project.
-
- Crossroads I/O is free software; you can redistribute it and/or modify it
- under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- Crossroads is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <stdlib.h>
-
-#include <new>
-#include <algorithm>
-
-#include "platform.hpp"
-#if defined XS_HAVE_WINDOWS
-#include "windows.hpp"
-#endif
-
-#include "err.hpp"
-#include "pipe.hpp"
-#include "mtrie.hpp"
-
-xs::mtrie_t::mtrie_t () :
- pipes (0),
- min (0),
- count (0),
- live_nodes (0)
-{
-}
-
-xs::mtrie_t::~mtrie_t ()
-{
- if (pipes) {
- delete pipes;
- pipes = 0;
- }
-
- if (count == 1) {
- xs_assert (next.node);
- delete next.node;
- next.node = 0;
- }
- else if (count > 1) {
- for (unsigned short i = 0; i != count; ++i)
- if (next.table [i])
- delete next.table [i];
- free (next.table);
- }
-}
-
-bool xs::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
-{
- return add_helper (prefix_, size_, pipe_);
-}
-
-bool xs::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
- pipe_t *pipe_)
-{
- // We are at the node corresponding to the prefix. We are done.
- if (!size_) {
- bool result = !pipes;
- if (!pipes)
- pipes = new pipes_t;
- pipes->insert (pipe_);
- return result;
- }
-
- unsigned char c = *prefix_;
- if (c < min || c >= min + count) {
-
- // The character is out of range of currently handled
- // charcters. We have to extend the table.
- if (!count) {
- min = c;
- count = 1;
- next.node = NULL;
- }
- else if (count == 1) {
- unsigned char oldc = min;
- mtrie_t *oldp = next.node;
- count = (min < c ? c - min : min - c) + 1;
- next.table = (mtrie_t**)
- malloc (sizeof (mtrie_t*) * count);
- xs_assert (next.table);
- for (unsigned short i = 0; i != count; ++i)
- next.table [i] = 0;
- min = std::min (min, c);
- next.table [oldc - min] = oldp;
- }
- else if (min < c) {
-
- // The new character is above the current character range.
- unsigned short old_count = count;
- count = c - min + 1;
- next.table = (mtrie_t**) realloc ((void*) next.table,
- sizeof (mtrie_t*) * count);
- xs_assert (next.table);
- for (unsigned short i = old_count; i != count; i++)
- next.table [i] = NULL;
- }
- else {
-
- // The new character is below the current character range.
- unsigned short old_count = count;
- count = (min + old_count) - c;
- next.table = (mtrie_t**) realloc ((void*) next.table,
- sizeof (mtrie_t*) * count);
- xs_assert (next.table);
- memmove (next.table + min - c, next.table,
- old_count * sizeof (mtrie_t*));
- for (unsigned short i = 0; i != min - c; i++)
- next.table [i] = NULL;
- min = c;
- }
- }
-
- // If next node does not exist, create one.
- if (count == 1) {
- if (!next.node) {
- next.node = new (std::nothrow) mtrie_t;
- ++live_nodes;
- xs_assert (next.node);
- }
- return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_);
- }
- else {
- if (!next.table [c - min]) {
- next.table [c - min] = new (std::nothrow) mtrie_t;
- ++live_nodes;
- xs_assert (next.table [c - min]);
- }
- return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_);
- }
-}
-
-
-void xs::mtrie_t::rm (pipe_t *pipe_,
- void (*func_) (unsigned char *data_, size_t size_, void *arg_),
- void *arg_)
-{
- unsigned char *buff = NULL;
- rm_helper (pipe_, &buff, 0, 0, func_, arg_);
- free (buff);
-}
-
-void xs::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
- size_t buffsize_, size_t maxbuffsize_,
- void (*func_) (unsigned char *data_, size_t size_, void *arg_),
- void *arg_)
-{
- // Remove the subscription from this node.
- if (pipes && pipes->erase (pipe_) && pipes->empty ()) {
- func_ (*buff_, buffsize_, arg_);
- delete pipes;
- pipes = 0;
- }
-
- // Adjust the buffer.
- if (buffsize_ >= maxbuffsize_) {
- maxbuffsize_ = buffsize_ + 256;
- *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
- alloc_assert (*buff_);
- }
-
- // If there are no subnodes in the trie, return.
- if (count == 0)
- return;
-
- // If there's one subnode (optimisation).
- if (count == 1) {
- (*buff_) [buffsize_] = min;
- buffsize_++;
- next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_,
- func_, arg_);
-
- // Prune the node if it was made redundant by the removal
- if (next.node->is_redundant ()) {
- delete next.node;
- next.node = 0;
- count = 0;
- --live_nodes;
- xs_assert (live_nodes == 0);
- }
- return;
- }
-
- // If there are multiple subnodes.
- //
- // New min non-null character in the node table after the removal
- unsigned char new_min = min + count - 1;
- // New max non-null character in the node table after the removal
- unsigned char new_max = min;
- for (unsigned short c = 0; c != count; c++) {
- (*buff_) [buffsize_] = min + c;
- if (next.table [c]) {
- next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1,
- maxbuffsize_, func_, arg_);
-
- // Prune redundant nodes from the mtrie
- if (next.table [c]->is_redundant ()) {
- delete next.table [c];
- next.table [c] = 0;
-
- xs_assert (live_nodes > 0);
- --live_nodes;
- }
- else {
- // The node is not redundant, so it's a candidate for being
- // the new min/max node.
- //
- // We loop through the node array from left to right, so the
- // first non-null, non-redundant node encountered is the new
- // minimum index. Conversely, the last non-redundant, non-null
- // node encountered is the new maximum index.
- if (c + min < new_min)
- new_min = c + min;
- if (c + min > new_max)
- new_max = c + min;
- }
- }
- }
-
- xs_assert (count > 1);
-
- // Compact the node table if possible
- if (live_nodes == 1) {
- // If there's only one live node in the table we can
- // switch to using the more compact single-node
- // representation
- xs_assert (new_min == new_max);
- xs_assert (new_min >= min && new_min < min + count);
- mtrie_t *node = next.table [new_min - min];
- xs_assert (node);
- free (next.table);
- next.node = node;
- count = 1;
- min = new_min;
- }
- else if (live_nodes > 1 && (new_min > min || new_max < min + count - 1)) {
- xs_assert (new_max - new_min + 1 > 1);
-
- mtrie_t **old_table = next.table;
- xs_assert (new_min > min || new_max < min + count - 1);
- xs_assert (new_min >= min);
- xs_assert (new_max <= min + count - 1);
- xs_assert (new_max - new_min + 1 < count);
-
- count = new_max - new_min + 1;
- next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
- xs_assert (next.table);
-
- memmove (next.table, old_table + (new_min - min),
- sizeof (mtrie_t*) * count);
- free (old_table);
-
- min = new_min;
- }
-}
-
-bool xs::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
-{
- return rm_helper (prefix_, size_, pipe_);
-}
-
-bool xs::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
- pipe_t *pipe_)
-{
- if (!size_) {
- if (pipes) {
- pipes_t::size_type erased = pipes->erase (pipe_);
- xs_assert (erased == 1);
- if (pipes->empty ()) {
- delete pipes;
- pipes = 0;
- }
- }
- return !pipes;
- }
-
- unsigned char c = *prefix_;
- if (!count || c < min || c >= min + count)
- return false;
-
- mtrie_t *next_node =
- count == 1 ? next.node : next.table [c - min];
-
- if (!next_node)
- return false;
-
- bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);
-
- if (next_node->is_redundant ()) {
- delete next_node;
- xs_assert (count > 0);
-
- if (count == 1) {
- next.node = 0;
- count = 0;
- --live_nodes;
- xs_assert (live_nodes == 0);
- }
- else {
- next.table [c - min] = 0;
- xs_assert (live_nodes > 1);
- --live_nodes;
-
- // Compact the table if possible
- if (live_nodes == 1) {
- // If there's only one live node in the table we can
- // switch to using the more compact single-node
- // representation
- mtrie_t *node = 0;
- for (unsigned short i = 0; i < count; ++i) {
- if (next.table [i]) {
- node = next.table [i];
- min = i + min;
- break;
- }
- }
-
- xs_assert (node);
- free (next.table);
- next.node = node;
- count = 1;
- }
- else if (c == min) {
- // We can compact the table "from the left"
- unsigned char new_min = min;
- for (unsigned short i = 1; i < count; ++i) {
- if (next.table [i]) {
- new_min = i + min;
- break;
- }
- }
- xs_assert (new_min != min);
-
- mtrie_t **old_table = next.table;
- xs_assert (new_min > min);
- xs_assert (count > new_min - min);
-
- count = count - (new_min - min);
- next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
- xs_assert (next.table);
-
- memmove (next.table, old_table + (new_min - min),
- sizeof (mtrie_t*) * count);
- free (old_table);
-
- min = new_min;
- }
- else if (c == min + count - 1) {
- // We can compact the table "from the right"
- unsigned short new_count = count;
- for (unsigned short i = 1; i < count; ++i) {
- if (next.table [count - 1 - i]) {
- new_count = count - i;
- break;
- }
- }
- xs_assert (new_count != count);
- count = new_count;
-
- mtrie_t **old_table = next.table;
- next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
- xs_assert (next.table);
-
- memmove (next.table, old_table, sizeof (mtrie_t*) * count);
- free (old_table);
- }
- }
- }
-
- return ret;
-}
-
-void xs::mtrie_t::match (unsigned char *data_, size_t size_,
- void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
-{
- mtrie_t *current = this;
- while (true) {
-
- // Signal the pipes attached to this node.
- if (current->pipes) {
- for (pipes_t::iterator it = current->pipes->begin ();
- it != current->pipes->end (); ++it)
- func_ (*it, arg_);
- }
-
- // If we are at the end of the message, there's nothing more to match.
- if (!size_)
- break;
-
- // If there are no subnodes in the trie, return.
- if (current->count == 0)
- break;
-
- // If there's one subnode (optimisation).
- if (current->count == 1) {
- if (data_ [0] != current->min)
- break;
- current = current->next.node;
- data_++;
- size_--;
- continue;
- }
-
- // If there are multiple subnodes.
- if (data_ [0] < current->min || data_ [0] >=
- current->min + current->count)
- break;
- if (!current->next.table [data_ [0] - current->min])
- break;
- current = current->next.table [data_ [0] - current->min];
- data_++;
- size_--;
- }
-}
-
-bool xs::mtrie_t::is_redundant () const
-{
- return !pipes && live_nodes == 0;
-}
-
diff --git a/src/mtrie.hpp b/src/mtrie.hpp
deleted file mode 100644
index 1e56b4e..0000000
--- a/src/mtrie.hpp
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- Copyright (c) 2011-2012 250bpm s.r.o.
- Copyright (c) 2011-2012 Spotify AB
- Copyright (c) 2011 Other contributors as noted in the AUTHORS file
-
- This file is part of Crossroads I/O project.
-
- Crossroads I/O is free software; you can redistribute it and/or modify it
- under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- Crossroads is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __XS_MTRIE_HPP_INCLUDED__
-#define __XS_MTRIE_HPP_INCLUDED__
-
-#include <stddef.h>
-#include <set>
-
-#include "stdint.hpp"
-
-namespace xs
-{
-
- class pipe_t;
-
- // Multi-trie. Each node in the trie is a set of pointers to pipes.
-
- class mtrie_t
- {
- public:
-
- mtrie_t ();
- ~mtrie_t ();
-
- // Add key to the trie. Returns true if it's a new subscription
- // rather than a duplicate.
- bool add (unsigned char *prefix_, size_t size_, xs::pipe_t *pipe_);
-
- // Remove all subscriptions for a specific peer from the trie.
- // If there are no subscriptions left on some topics, invoke the
- // supplied callback function.
- void rm (xs::pipe_t *pipe_,
- void (*func_) (unsigned char *data_, size_t size_, void *arg_),
- void *arg_);
-
- // Remove specific subscription from the trie. Return true is it was
- // actually removed rather than de-duplicated.
- bool rm (unsigned char *prefix_, size_t size_, xs::pipe_t *pipe_);
-
- // Signal all the matching pipes.
- void match (unsigned char *data_, size_t size_,
- void (*func_) (xs::pipe_t *pipe_, void *arg_), void *arg_);
-
- private:
-
- bool add_helper (unsigned char *prefix_, size_t size_,
- xs::pipe_t *pipe_);
- void rm_helper (xs::pipe_t *pipe_, unsigned char **buff_,
- size_t buffsize_, size_t maxbuffsize_,
- void (*func_) (unsigned char *data_, size_t size_, void *arg_),
- void *arg_);
- bool rm_helper (unsigned char *prefix_, size_t size_,
- xs::pipe_t *pipe_);
- bool is_redundant () const;
-
- typedef std::set <xs::pipe_t*> pipes_t;
- pipes_t *pipes;
-
- unsigned char min;
- unsigned short count;
- unsigned short live_nodes;
- union {
- class mtrie_t *node;
- class mtrie_t **table;
- } next;
-
- mtrie_t (const mtrie_t&);
- const mtrie_t &operator = (const mtrie_t&);
- };
-
-}
-
-#endif
-
diff --git a/src/object.cpp b/src/object.cpp
index 5c1ed84..4f04fe4 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -154,6 +154,11 @@ xs::io_thread_t *xs::object_t::choose_io_thread (uint64_t affinity_)
return ctx->choose_io_thread (affinity_);
}
+xs_filter_t *xs::object_t::get_filter (int filter_id_)
+{
+ return ctx->get_filter (filter_id_);
+}
+
void xs::object_t::send_stop ()
{
// 'stop' command goes always from administrative thread to
diff --git a/src/object.hpp b/src/object.hpp
index 5b855a5..fabb156 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -22,6 +22,8 @@
#ifndef __XS_OBJECT_HPP_INCLUDED__
#define __XS_OBJECT_HPP_INCLUDED__
+#include "../include/xs.h"
+
#include "stdint.hpp"
namespace xs
@@ -64,6 +66,9 @@ namespace xs
// Chooses least loaded I/O thread.
xs::io_thread_t *choose_io_thread (uint64_t affinity_);
+ // Functions related to extensions.
+ xs_filter_t *get_filter (int filter_id_);
+
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
diff --git a/src/options.cpp b/src/options.cpp
index c9cbaae..fdbffde 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -23,6 +23,8 @@
#include <string.h>
#include <limits>
+#include "../include/xs.h"
+
#include "options.hpp"
#include "err.hpp"
@@ -47,6 +49,7 @@ xs::options_t::options_t () :
ipv4only (1),
keepalive (0),
protocol (0),
+ filter_id (XS_FILTER_PREFIX),
delay_on_close (true),
delay_on_disconnect (true),
filter (false),
@@ -248,6 +251,14 @@ int xs::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
+ case XS_FILTER:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ filter_id = *((int*) optval_);
+ return 0;
+
}
errno = EINVAL;
@@ -438,6 +449,15 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
+ case XS_FILTER:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = filter_id;
+ *optvallen_ = sizeof (int);
+ return 0;
+
}
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index c1e4dda..1288f72 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -95,6 +95,9 @@ namespace xs
// Version of wire protocol to use.
int protocol;
+ // Filter ID to be used with subscriptions and unsubscriptions.
+ int filter_id;
+
// If true, session reads all the pending messages from the pipe and
// sends them to the network when socket is closed.
bool delay_on_close;
diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp
new file mode 100644
index 0000000..5d21fb6
--- /dev/null
+++ b/src/prefix_filter.cpp
@@ -0,0 +1,664 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2011-2012 Spotify AB
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <new>
+
+#include "../include/xs.h"
+
+#include "prefix_filter.hpp"
+#include "err.hpp"
+
+xs::prefix_filter_t::prefix_filter_t ()
+{
+ init (&root);
+}
+
+xs::prefix_filter_t::~prefix_filter_t ()
+{
+ close (&root);
+}
+
+int xs::prefix_filter_t::subscribe (void *core_, void *subscriber_,
+ const unsigned char *data_, size_t size_)
+{
+ return add (&root, data_, size_, subscriber_) ? 1 : 0;
+}
+
+int xs::prefix_filter_t::unsubscribe (void *core_, void *subscriber_,
+ const unsigned char *data_, size_t size_)
+{
+ return rm (&root, data_, size_, subscriber_) ? 1 : 0;
+}
+
+void xs::prefix_filter_t::unsubscribe_all (void *core_, void *subscriber_)
+{
+ rm (&root, subscriber_, core_);
+}
+
+void xs::prefix_filter_t::enumerate (void *core_)
+{
+ unsigned char *buff = NULL;
+ list (&root, &buff, 0, 0, core_);
+ free (buff);
+}
+
+int xs::prefix_filter_t::match (void *core_,
+ const unsigned char *data_, size_t size_)
+{
+ // This function is on critical path. It deliberately doesn't use
+ // recursion to get a bit better performance.
+ node_t *current = &root;
+ while (true) {
+
+ // We've found a corresponding subscription!
+ if (current->subscribers)
+ return 1;
+
+ // We've checked all the data and haven't found matching subscription.
+ if (!size_)
+ return 0;
+
+ // If there's no corresponding slot for the first character
+ // of the prefix, the message does not match.
+ unsigned char c = *data_;
+ if (c < current->min || c >= current->min + current->count)
+ return 0;
+
+ // Move to the next character.
+ if (current->count == 1)
+ current = current->next.node;
+ else {
+ current = current->next.table [c - current->min];
+ if (!current)
+ return 0;
+ }
+ data_++;
+ size_--;
+ }
+
+}
+
+void xs::prefix_filter_t::match_all (void *core_,
+ const unsigned char *data_, size_t size_)
+{
+ node_t *current = &root;
+ while (true) {
+
+ // Signal the subscribers attached to this node.
+ if (current->subscribers) {
+ for (node_t::subscribers_t::iterator it =
+ current->subscribers->begin ();
+ it != current->subscribers->end (); ++it) {
+ int rc = xs_filter_matching (core_, it->first);
+ errno_assert (rc == 0);
+ }
+ }
+
+ // If we are at the end of the message, there's nothing more to match.
+ if (!size_)
+ break;
+
+ // If there are no subnodes in the trie, return.
+ if (current->count == 0)
+ break;
+
+ // If there's one subnode (optimisation).
+ if (current->count == 1) {
+ if (data_ [0] != current->min)
+ break;
+ current = current->next.node;
+ data_++;
+ size_--;
+ continue;
+ }
+
+ // If there are multiple subnodes.
+ if (data_ [0] < current->min || data_ [0] >=
+ current->min + current->count)
+ break;
+ if (!current->next.table [data_ [0] - current->min])
+ break;
+ current = current->next.table [data_ [0] - current->min];
+ data_++;
+ size_--;
+ }
+}
+
+void xs::prefix_filter_t::init (node_t *node_)
+{
+ node_->subscribers = NULL;
+ node_->min = 0;
+ node_->count = 0;
+ node_->live_nodes = 0;
+}
+
+void xs::prefix_filter_t::close (node_t *node_)
+{
+ if (node_->subscribers) {
+ delete node_->subscribers;
+ node_->subscribers = NULL;
+ }
+
+ if (node_->count == 1) {
+ xs_assert (node_->next.node);
+ close (node_->next.node);
+ delete node_->next.node;
+ node_->next.node = NULL;
+ }
+ else if (node_->count > 1) {
+ for (unsigned short i = 0; i != node_->count; ++i)
+ if (node_->next.table [i]) {
+ close (node_->next.table [i]);
+ delete node_->next.table [i];
+ }
+ free (node_->next.table);
+ }
+}
+
+bool xs::prefix_filter_t::add (node_t *node_, const unsigned char *prefix_,
+ size_t size_, void *subscriber_)
+{
+ // We are at the node corresponding to the prefix. We are done.
+ if (!size_) {
+ bool result = !node_->subscribers;
+ if (!node_->subscribers)
+ node_->subscribers = new (std::nothrow) node_t::subscribers_t;
+ node_t::subscribers_t::iterator it = node_->subscribers->insert (
+ node_t::subscribers_t::value_type (subscriber_, 0)).first;
+ ++it->second;
+ return result;
+ }
+
+ unsigned char c = *prefix_;
+ if (c < node_->min || c >= node_->min + node_->count) {
+
+ // The character is out of range of currently handled
+ // charcters. We have to extend the table.
+ if (!node_->count) {
+ node_->min = c;
+ node_->count = 1;
+ node_->next.node = NULL;
+ }
+ else if (node_->count == 1) {
+ unsigned char oldc = node_->min;
+ node_t *oldp = node_->next.node;
+ node_->count =
+ (node_->min < c ? c - node_->min : node_->min - c) + 1;
+ node_->next.table = (node_t**)
+ malloc (sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+ for (unsigned short i = 0; i != node_->count; ++i)
+ node_->next.table [i] = 0;
+ node_->min = std::min (node_->min, c);
+ node_->next.table [oldc - node_->min] = oldp;
+ }
+ else if (node_->min < c) {
+
+ // The new character is above the current character range.
+ unsigned short old_count = node_->count;
+ node_->count = c - node_->min + 1;
+ node_->next.table = (node_t**) realloc ((void*) node_->next.table,
+ sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+ for (unsigned short i = old_count; i != node_->count; i++)
+ node_->next.table [i] = NULL;
+ }
+ else {
+
+ // The new character is below the current character range.
+ unsigned short old_count = node_->count;
+ node_->count = (node_->min + old_count) - c;
+ node_->next.table = (node_t**) realloc ((void*) node_->next.table,
+ sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+ memmove (node_->next.table + node_->min - c, node_->next.table,
+ old_count * sizeof (node_t*));
+ for (unsigned short i = 0; i != node_->min - c; i++)
+ node_->next.table [i] = NULL;
+ node_->min = c;
+ }
+ }
+
+ // If next node does not exist, create one.
+ if (node_->count == 1) {
+ if (!node_->next.node) {
+ node_->next.node = new (std::nothrow) node_t;
+ alloc_assert (node_->next.node);
+ init (node_->next.node);
+ ++node_->live_nodes;
+ xs_assert (node_->next.node);
+ }
+ return add (node_->next.node, prefix_ + 1, size_ - 1, subscriber_);
+ }
+ else {
+ if (!node_->next.table [c - node_->min]) {
+ node_->next.table [c - node_->min] = new (std::nothrow) node_t;
+ alloc_assert (node_->next.table [c - node_->min]);
+ init (node_->next.table [c - node_->min]);
+ ++node_->live_nodes;
+ xs_assert (node_->next.table [c - node_->min]);
+ }
+ return add (node_->next.table [c - node_->min], prefix_ + 1, size_ - 1,
+ subscriber_);
+ }
+}
+
+
+void xs::prefix_filter_t::rm (node_t *node_, void *subscriber_, void *arg_)
+{
+ unsigned char *buff = NULL;
+ rm_helper (node_, subscriber_, &buff, 0, 0, arg_);
+ free (buff);
+}
+
+void xs::prefix_filter_t::rm_helper (node_t *node_, void *subscribers_,
+ unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, void *arg_)
+{
+ // Remove the subscription from this node.
+ if (node_->subscribers) {
+ node_t::subscribers_t::iterator it =
+ node_->subscribers->find (subscribers_);
+ if (it != node_->subscribers->end ()) {
+ xs_assert (it->second);
+ --it->second;
+ if (!it->second) {
+ node_->subscribers->erase (it);
+ if (node_->subscribers->empty ()) {
+ int rc = xs_filter_unsubscribed (arg_, *buff_, buffsize_);
+ errno_assert (rc == 0);
+ delete node_->subscribers;
+ node_->subscribers = 0;
+ }
+ }
+ }
+ }
+
+ // Adjust the buffer.
+ if (buffsize_ >= maxbuffsize_) {
+ maxbuffsize_ = buffsize_ + 256;
+ *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
+ alloc_assert (*buff_);
+ }
+
+ // If there are no subnodes in the trie, return.
+ if (node_->count == 0)
+ return;
+
+ // If there's one subnode (optimisation).
+ if (node_->count == 1) {
+ (*buff_) [buffsize_] = node_->min;
+ buffsize_++;
+ rm_helper (node_->next.node, subscribers_, buff_, buffsize_,
+ maxbuffsize_, arg_);
+
+ // Prune the node if it was made redundant by the removal
+ if (is_redundant (node_->next.node)) {
+ close (node_->next.node);
+ delete node_->next.node;
+ node_->next.node = 0;
+ node_->count = 0;
+ --node_->live_nodes;
+ xs_assert (node_->live_nodes == 0);
+ }
+ return;
+ }
+
+ // If there are multiple subnodes.
+ //
+ // New min non-null character in the node table after the removal.
+ unsigned char new_min = node_->min;
+ // New max non-null character in the node table after the removal.
+ unsigned char new_max = node_->min + node_->count - 1;
+ for (unsigned short c = 0; c != node_->count; c++) {
+ (*buff_) [buffsize_] = node_->min + c;
+ if (node_->next.table [c]) {
+ rm_helper (node_->next.table [c], subscribers_, buff_,
+ buffsize_ + 1, maxbuffsize_, arg_);
+
+ // Prune redudant nodes from the the trie.
+ if (is_redundant (node_->next.table [c])) {
+ close (node_->next.table [c]);
+ delete node_->next.table [c];
+ node_->next.table [c] = 0;
+
+ xs_assert (node_->live_nodes > 0);
+ --node_->live_nodes;
+ }
+ else {
+ if (c + node_->min > new_min)
+ new_min = c + node_->min;
+ if (c + node_->min < new_max)
+ new_max = c + node_->min;
+ }
+ }
+ }
+
+ xs_assert (node_->count > 1);
+
+ // Compact the node table if possible.
+ if (node_->live_nodes == 1) {
+
+ // If there's only one live node in the table we can
+ // switch to using the more compact single-node
+ // representation.
+ xs_assert (new_min == new_max);
+ xs_assert (new_min >= node_->min &&
+ new_min < node_->min + node_->count);
+ node_t *node = node_->next.table [new_min - node_->min];
+ xs_assert (node);
+ free (node_->next.table);
+ node_->next.node = node;
+ node_->count = 1;
+ node_->min = new_min;
+ }
+ else if (node_->live_nodes > 1 &&
+ (new_min > node_->min || new_max < node_->min + node_->count - 1)) {
+ xs_assert (new_max - new_min + 1 > 1);
+
+ node_t **old_table = node_->next.table;
+ xs_assert (new_min > node_->min ||
+ new_max < node_->min + node_->count - 1);
+ xs_assert (new_min >= node_->min);
+ xs_assert (new_max <= node_->min + node_->count - 1);
+ xs_assert (new_max - new_min + 1 < node_->count);
+
+ node_->count = new_max - new_min + 1;
+ node_->next.table = (node_t**) malloc (sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+
+ memmove (node_->next.table, old_table + (new_min - node_->min),
+ sizeof (node_t*) * node_->count);
+ free (old_table);
+
+ node_->min = new_min;
+ }
+}
+
+bool xs::prefix_filter_t::rm (node_t *node_, const unsigned char *prefix_,
+ size_t size_, void *subscriber_)
+{
+ if (!size_) {
+
+ // Remove the subscription from this node.
+ if (node_->subscribers) {
+ node_t::subscribers_t::iterator it =
+ node_->subscribers->find (subscriber_);
+ if (it != node_->subscribers->end ()) {
+ xs_assert (it->second);
+ --it->second;
+ if (!it->second) {
+ node_->subscribers->erase (it);
+ if (node_->subscribers->empty ()) {
+ delete node_->subscribers;
+ node_->subscribers = 0;
+ }
+ }
+ }
+ }
+ return !node_->subscribers;
+ }
+
+ unsigned char c = *prefix_;
+ if (!node_->count || c < node_->min || c >= node_->min + node_->count)
+ return false;
+
+ node_t *next_node = node_->count == 1 ? node_->next.node :
+ node_->next.table [c - node_->min];
+
+ if (!next_node)
+ return false;
+
+ bool ret = rm (next_node, prefix_ + 1, size_ - 1, subscriber_);
+
+ if (is_redundant (next_node)) {
+ close (next_node);
+ delete next_node;
+ xs_assert (node_->count > 0);
+
+ if (node_->count == 1) {
+ node_->next.node = 0;
+ node_->count = 0;
+ --node_->live_nodes;
+ xs_assert (node_->live_nodes == 0);
+ }
+ else {
+ node_->next.table [c - node_->min] = 0;
+ xs_assert (node_->live_nodes > 1);
+ --node_->live_nodes;
+
+ // Compact the table if possible.
+ if (node_->live_nodes == 1) {
+ // If there's only one live node in the table we can
+ // switch to using the more compact single-node
+ // representation
+ node_t *node = 0;
+ for (unsigned short i = 0; i < node_->count; ++i) {
+ if (node_->next.table [i]) {
+ node = node_->next.table [i];
+ node_->min += i;
+ break;
+ }
+ }
+
+ xs_assert (node);
+ free (node_->next.table);
+ node_->next.node = node;
+ node_->count = 1;
+ }
+ else if (c == node_->min) {
+
+ // We can compact the table "from the left".
+ unsigned char new_min = node_->min;
+ for (unsigned short i = 1; i < node_->count; ++i) {
+ if (node_->next.table [i]) {
+ new_min = i + node_->min;
+ break;
+ }
+ }
+ xs_assert (new_min != node_->min);
+
+ node_t **old_table = node_->next.table;
+ xs_assert (new_min > node_->min);
+ xs_assert (node_->count > new_min - node_->min);
+
+ node_->count = node_->count - (new_min - node_->min);
+ node_->next.table =
+ (node_t**) malloc (sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+
+ memmove (node_->next.table, old_table + (new_min - node_->min),
+ sizeof (node_t*) * node_->count);
+ free (old_table);
+
+ node_->min = new_min;
+ }
+ else if (c == node_->min + node_->count - 1) {
+
+ // We can compact the table "from the right".
+ unsigned short new_count = node_->count;
+ for (unsigned short i = 1; i < node_->count; ++i) {
+ if (node_->next.table [node_->count - 1 - i]) {
+ new_count = node_->count - i;
+ break;
+ }
+ }
+ xs_assert (new_count != node_->count);
+ node_->count = new_count;
+
+ node_t **old_table = node_->next.table;
+ node_->next.table =
+ (node_t**) malloc (sizeof (node_t*) * node_->count);
+ xs_assert (node_->next.table);
+
+ memmove (node_->next.table, old_table,
+ sizeof (node_t*) * node_->count);
+ free (old_table);
+ }
+ }
+ }
+
+ return ret;
+}
+
+void xs::prefix_filter_t::list (node_t *node_, unsigned char **buff_,
+ size_t buffsize_, size_t maxbuffsize_, void *arg_)
+{
+ // If this node is a subscription, apply the function.
+ if (node_->subscribers) {
+ int rc = xs_filter_subscribed (arg_, *buff_, buffsize_);
+ errno_assert (rc == 0);
+ }
+
+ // Adjust the buffer.
+ if (buffsize_ >= maxbuffsize_) {
+ maxbuffsize_ = buffsize_ + 256;
+ *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
+ xs_assert (*buff_);
+ }
+
+ // If there are no subnodes in the trie, return.
+ if (node_->count == 0)
+ return;
+
+ // If there's one subnode (optimisation).
+ if (node_->count == 1) {
+ (*buff_) [buffsize_] = node_->min;
+ buffsize_++;
+ list (node_->next.node, buff_, buffsize_, maxbuffsize_, arg_);
+ return;
+ }
+
+ // If there are multiple subnodes.
+ for (unsigned short c = 0; c != node_->count; c++) {
+ (*buff_) [buffsize_] = node_->min + c;
+ if (node_->next.table [c])
+ list (node_->next.table [c], buff_, buffsize_ + 1, maxbuffsize_,
+ arg_);
+ }
+}
+
+bool xs::prefix_filter_t::is_redundant (node_t *node_)
+{
+ return !node_->subscribers && node_->live_nodes == 0;
+}
+
+// Implementation of the C interface of the filter.
+// Following functions convert raw C calls into calls to C++ object methods.
+
+static int id (void *core_)
+{
+ return XS_FILTER_PREFIX;
+}
+
+static void *pf_create (void *core_)
+{
+ void *pf = (void*) new (std::nothrow) xs::prefix_filter_t;
+ alloc_assert (pf);
+ return pf;
+}
+
+static void pf_destroy (void *core_, void *pf_)
+{
+ delete (xs::prefix_filter_t*) pf_;
+}
+
+static int pf_subscribe (void *core_, void *pf_, void *subscriber_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::prefix_filter_t*) pf_)->subscribe (core_, subscriber_,
+ data_, size_);
+}
+
+static int pf_unsubscribe (void *core_, void *pf_, void *subscriber_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::prefix_filter_t*) pf_)->unsubscribe (core_, subscriber_,
+ data_, size_);
+}
+
+static void pf_unsubscribe_all (void *core_, void *pf_, void *subscriber_)
+{
+ ((xs::prefix_filter_t*) pf_)->unsubscribe_all (core_, subscriber_);
+}
+
+static void pf_match (void *core_, void *pf_,
+ const unsigned char *data_, size_t size_)
+{
+ ((xs::prefix_filter_t*) pf_)->match_all (core_, data_, size_);
+}
+
+static void *sf_create (void *core_)
+{
+ void *sf = (void*) new (std::nothrow) xs::prefix_filter_t;
+ alloc_assert (sf);
+ return sf;
+}
+
+static void sf_destroy (void *core_, void *sf_)
+{
+ delete (xs::prefix_filter_t*) sf_;
+}
+
+static int sf_subscribe (void *core_, void *sf_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::prefix_filter_t*) sf_)->subscribe (core_, NULL,
+ data_, size_);
+}
+
+static int sf_unsubscribe (void *core_, void *sf_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::prefix_filter_t*) sf_)->unsubscribe (core_, NULL,
+ data_, size_);
+}
+
+static void sf_enumerate (void *core_, void *sf_)
+{
+ ((xs::prefix_filter_t*) sf_)->enumerate (core_);
+}
+
+static int sf_match (void *core_, void *sf_,
+ const unsigned char *data_, size_t size_)
+{
+ return ((xs::prefix_filter_t*) sf_)->match (core_, data_, size_);
+}
+
+static xs_filter_t filter = {
+ XS_PLUGIN_FILTER,
+ 1,
+ id,
+ pf_create,
+ pf_destroy,
+ pf_subscribe,
+ pf_unsubscribe,
+ pf_unsubscribe_all,
+ pf_match,
+ sf_create,
+ sf_destroy,
+ sf_subscribe,
+ sf_unsubscribe,
+ sf_enumerate,
+ sf_match,
+};
+
+void *xs::prefix_filter = (void*) &filter;
+
diff --git a/src/prefix_filter.hpp b/src/prefix_filter.hpp
new file mode 100644
index 0000000..0faa865
--- /dev/null
+++ b/src/prefix_filter.hpp
@@ -0,0 +1,108 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2011-2012 Spotify AB
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_PREFIX_FILTER_HPP_INCLUDED__
+#define __XS_PREFIX_FILTER_HPP_INCLUDED__
+
+#include <stddef.h>
+#include <map>
+
+#include "stdint.hpp"
+
+namespace xs
+{
+
+ // Canonical extension object.
+ extern void *prefix_filter;
+
+ class prefix_filter_t
+ {
+ public:
+
+ prefix_filter_t ();
+ ~prefix_filter_t ();
+
+ int subscribe (void *core_, void *subscriber_,
+ const unsigned char *data_, size_t size_);
+ int unsubscribe (void *core_, void *subscriber_,
+ const unsigned char *data_, size_t size_);
+ void unsubscribe_all (void *core_, void *subscriber_);
+ void enumerate (void *core_);
+ int match (void *core_, const unsigned char *data_, size_t size_);
+ void match_all (void *core_, const unsigned char *data_, size_t size_);
+
+ private:
+
+ struct node_t
+ {
+ // Pointer to particular subscriber associated with
+ // the reference count.
+ typedef std::map <void*, int> subscribers_t;
+ subscribers_t *subscribers;
+
+ unsigned char min;
+ unsigned short count;
+ unsigned short live_nodes;
+ union {
+ struct node_t *node;
+ struct node_t **table;
+ } next;
+
+ };
+
+ static void init (node_t *node_);
+ static void close (node_t *node_);
+
+ // Add key to the trie. Returns true if it's a new subscription
+ // rather than a duplicate.
+ static bool add (node_t *node_, const unsigned char *prefix_,
+ size_t size_, void *subscriber_);
+
+ // Remove specific subscription from the trie. Return true is it
+ // was actually removed rather than de-duplicated.
+ static bool rm (node_t *node_, const unsigned char *prefix_,
+ size_t size_, void *subscriber_);
+
+ // Remove all subscriptions for a specific peer from the trie.
+ // If there are no subscriptions left on some topics, invoke the
+ // supplied callback function.
+ static void rm (node_t *node_, void *subscriber_, void *arg_);
+
+ static void rm_helper (node_t *node_, void *subscriber_,
+ unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_,
+ void *arg_);
+
+ // Lists all the subscriptions in the trie.
+ static void list (node_t *node_, unsigned char **buff_,
+ size_t buffsize_, size_t maxbuffsize_, void *arg_);
+
+ // Checks whether node can be safely removed.
+ static bool is_redundant (node_t *node_);
+
+ node_t root;
+
+ prefix_filter_t (const prefix_filter_t&);
+ const prefix_filter_t &operator = (const prefix_filter_t&);
+ };
+
+}
+
+#endif
diff --git a/src/sub.cpp b/src/sub.cpp
index 3065345..442dd9b 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -21,6 +21,7 @@
#include "sub.hpp"
#include "msg.hpp"
+#include "wire.hpp"
xs::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xsub_t (parent_, tid_, sid_)
@@ -51,14 +52,28 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,
// Create the subscription message.
msg_t msg;
+ int rc = msg.init_size (optvallen_ + 4);
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) msg.data ();
+ if (option_ == XS_SUBSCRIBE)
+ put_uint16 (data, XS_CMD_SUBSCRIBE);
+ else if (option_ == XS_UNSUBSCRIBE)
+ put_uint16 (data, XS_CMD_UNSUBSCRIBE);
+ put_uint16 (data + 2, options.filter_id);
+ memcpy (data + 4, optval_, optvallen_);
+
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ msg_t msg;
int rc = msg.init_size (optvallen_ + 1);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
if (option_ == XS_SUBSCRIBE)
- *data = 1;
+ data [0] = 1;
else if (option_ == XS_UNSUBSCRIBE)
- *data = 0;
+ data [0] = 0;
memcpy (data + 1, optval_, optvallen_);
+#endif
// Pass it further on in the stack.
int err = 0;
diff --git a/src/trie.cpp b/src/trie.cpp
deleted file mode 100644
index af6cdd9..0000000
--- a/src/trie.cpp
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- Copyright (c) 2009-2012 250bpm s.r.o.
- Copyright (c) 2007-2009 iMatix Corporation
- Copyright (c) 2011-2012 Spotify AB
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of Crossroads I/O project.
-
- Crossroads I/O is free software; you can redistribute it and/or modify it
- under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- Crossroads is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <stdlib.h>
-
-#include <new>
-#include <algorithm>
-
-#include "platform.hpp"
-#if defined XS_HAVE_WINDOWS
-#include "windows.hpp"
-#endif
-
-#include "err.hpp"
-#include "trie.hpp"
-
-xs::trie_t::trie_t () :
- refcnt (0),
- min (0),
- count (0),
- live_nodes (0)
-{
-}
-
-xs::trie_t::~trie_t ()
-{
- if (count == 1) {
- xs_assert (next.node);
- delete next.node;
- next.node = 0;
- }
- else if (count > 1) {
- for (unsigned short i = 0; i != count; ++i)
- if (next.table [i])
- delete next.table [i];
- free (next.table);
- }
-}
-
-bool xs::trie_t::add (unsigned char *prefix_, size_t size_)
-{
- // We are at the node corresponding to the prefix. We are done.
- if (!size_) {
- ++refcnt;
- return refcnt == 1;
- }
-
- unsigned char c = *prefix_;
- if (c < min || c >= min + count) {
-
- // The character is out of range of currently handled
- // charcters. We have to extend the table.
- if (!count) {
- min = c;
- count = 1;
- next.node = NULL;
- }
- else if (count == 1) {
- unsigned char oldc = min;
- trie_t *oldp = next.node;
- count = (min < c ? c - min : min - c) + 1;
- next.table = (trie_t**)
- malloc (sizeof (trie_t*) * count);
- xs_assert (next.table);
- for (unsigned short i = 0; i != count; ++i)
- next.table [i] = 0;
- min = std::min (min, c);
- next.table [oldc - min] = oldp;
- }
- else if (min < c) {
-
- // The new character is above the current character range.
- unsigned short old_count = count;
- count = c - min + 1;
- next.table = (trie_t**) realloc ((void*) next.table,
- sizeof (trie_t*) * count);
- xs_assert (next.table);
- for (unsigned short i = old_count; i != count; i++)
- next.table [i] = NULL;
- }
- else {
-
- // The new character is below the current character range.
- unsigned short old_count = count;
- count = (min + old_count) - c;
- next.table = (trie_t**) realloc ((void*) next.table,
- sizeof (trie_t*) * count);
- xs_assert (next.table);
- memmove (next.table + min - c, next.table,
- old_count * sizeof (trie_t*));
- for (unsigned short i = 0; i != min - c; i++)
- next.table [i] = NULL;
- min = c;
- }
- }
-
- // If next node does not exist, create one.
- if (count == 1) {
- if (!next.node) {
- next.node = new (std::nothrow) trie_t;
- xs_assert (next.node);
- ++live_nodes;
- xs_assert (live_nodes == 1);
- }
- return next.node->add (prefix_ + 1, size_ - 1);
- }
- else {
- if (!next.table [c - min]) {
- next.table [c - min] = new (std::nothrow) trie_t;
- xs_assert (next.table [c - min]);
- ++live_nodes;
- xs_assert (live_nodes > 1);
- }
- return next.table [c - min]->add (prefix_ + 1, size_ - 1);
- }
-}
-
-bool xs::trie_t::rm (unsigned char *prefix_, size_t size_)
-{
- // TODO: Shouldn't an error be reported if the key does not exist?
-
- if (!size_) {
- if (!refcnt)
- return false;
- refcnt--;
- return refcnt == 0;
- }
-
- unsigned char c = *prefix_;
- if (!count || c < min || c >= min + count)
- return false;
-
- trie_t *next_node =
- count == 1 ? next.node : next.table [c - min];
-
- if (!next_node)
- return false;
-
- bool ret = next_node->rm (prefix_ + 1, size_ - 1);
-
- // Prune redundant nodes
- if (next_node->is_redundant ()) {
- delete next_node;
- xs_assert (count > 0);
-
- if (count == 1) {
- // The just pruned node is was the only live node
- next.node = 0;
- count = 0;
- --live_nodes;
- xs_assert (live_nodes == 0);
- }
- else {
- next.table [c - min] = 0;
- xs_assert (live_nodes > 1);
- --live_nodes;
-
- // Compact the table if possible
- if (live_nodes == 1) {
- // We can switch to using the more compact single-node
- // representation since the table only contains one live node
- trie_t *node = 0;
- // Since we always compact the table the pruned node must
- // either be the left-most or right-most ptr in the node
- // table
- if (c == min) {
- // The pruned node is the left-most node ptr in the
- // node table => keep the right-most node
- node = next.table [count - 1];
- min += count - 1;
- }
- else if (c == min + count - 1) {
- // The pruned node is the right-most node ptr in the
- // node table => keep the left-most node
- node = next.table [0];
- }
-
- xs_assert (node);
- free (next.table);
- next.node = node;
- count = 1;
- }
- else if (c == min) {
- // We can compact the table "from the left".
- // Find the left-most non-null node ptr, which we'll use as
- // our new min
- unsigned char new_min = min;
- for (unsigned short i = 1; i < count; ++i) {
- if (next.table [i]) {
- new_min = i + min;
- break;
- }
- }
- xs_assert (new_min != min);
-
- trie_t **old_table = next.table;
- xs_assert (new_min > min);
- xs_assert (count > new_min - min);
-
- count = count - (new_min - min);
- next.table = (trie_t**) malloc (sizeof (trie_t*) * count);
- xs_assert (next.table);
-
- memmove (next.table, old_table + (new_min - min),
- sizeof (trie_t*) * count);
- free (old_table);
-
- min = new_min;
- }
- else if (c == min + count - 1) {
- // We can compact the table "from the right".
- // Find the right-most non-null node ptr, which we'll use to
- // determine the new table size
- unsigned short new_count = count;
- for (unsigned short i = 1; i < count; ++i) {
- if (next.table [count - 1 - i]) {
- new_count = count - i;
- break;
- }
- }
- xs_assert (new_count != count);
- count = new_count;
-
- trie_t **old_table = next.table;
- next.table = (trie_t**) malloc (sizeof (trie_t*) * count);
- xs_assert (next.table);
-
- memmove (next.table, old_table, sizeof (trie_t*) * count);
- free (old_table);
- }
- }
- }
-
- return ret;
-}
-
-bool xs::trie_t::check (unsigned char *data_, size_t size_)
-{
- // This function is on critical path. It deliberately doesn't use
- // recursion to get a bit better performance.
- trie_t *current = this;
- while (true) {
-
- // We've found a corresponding subscription!
- if (current->refcnt)
- return true;
-
- // We've checked all the data and haven't found matching subscription.
- if (!size_)
- return false;
-
- // If there's no corresponding slot for the first character
- // of the prefix, the message does not match.
- unsigned char c = *data_;
- if (c < current->min || c >= current->min + current->count)
- return false;
-
- // Move to the next character.
- if (current->count == 1)
- current = current->next.node;
- else {
- current = current->next.table [c - current->min];
- if (!current)
- return false;
- }
- data_++;
- size_--;
- }
-}
-
-void xs::trie_t::apply (void (*func_) (unsigned char *data_, size_t size_,
- void *arg_), void *arg_)
-{
- unsigned char *buff = NULL;
- apply_helper (&buff, 0, 0, func_, arg_);
- free (buff);
-}
-
-void xs::trie_t::apply_helper (
- unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_,
- void (*func_) (unsigned char *data_, size_t size_, void *arg_), void *arg_)
-{
- // If this node is a subscription, apply the function.
- if (refcnt)
- func_ (*buff_, buffsize_, arg_);
-
- // Adjust the buffer.
- if (buffsize_ >= maxbuffsize_) {
- maxbuffsize_ = buffsize_ + 256;
- *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
- xs_assert (*buff_);
- }
-
- // If there are no subnodes in the trie, return.
- if (count == 0)
- return;
-
- // If there's one subnode (optimisation).
- if (count == 1) {
- (*buff_) [buffsize_] = min;
- buffsize_++;
- next.node->apply_helper (buff_, buffsize_, maxbuffsize_, func_, arg_);
- return;
- }
-
- // If there are multiple subnodes.
- for (unsigned short c = 0; c != count; c++) {
- (*buff_) [buffsize_] = min + c;
- if (next.table [c])
- next.table [c]->apply_helper (buff_, buffsize_ + 1, maxbuffsize_,
- func_, arg_);
- }
-}
-
-bool xs::trie_t::is_redundant () const
-{
- return refcnt == 0 && live_nodes == 0;
-}
-
diff --git a/src/trie.hpp b/src/trie.hpp
deleted file mode 100644
index 3e65a1a..0000000
--- a/src/trie.hpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- Copyright (c) 2009-2012 250bpm s.r.o.
- Copyright (c) 2007-2009 iMatix Corporation
- Copyright (c) 2011-2012 Spotify AB
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of Crossroads I/O project.
-
- Crossroads I/O is free software; you can redistribute it and/or modify it
- under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- Crossroads is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __XS_TRIE_HPP_INCLUDED__
-#define __XS_TRIE_HPP_INCLUDED__
-
-#include <stddef.h>
-
-#include "stdint.hpp"
-
-namespace xs
-{
-
- class trie_t
- {
- public:
-
- trie_t ();
- ~trie_t ();
-
- // Add key to the trie. Returns true if this is a new item in the trie
- // rather than a duplicate.
- bool add (unsigned char *prefix_, size_t size_);
-
- // Remove key from the trie. Returns true if the item is actually
- // removed from the trie.
- bool rm (unsigned char *prefix_, size_t size_);
-
- // Check whether particular key is in the trie.
- bool check (unsigned char *data_, size_t size_);
-
- // Apply the function supplied to each subscription in the trie.
- void apply (void (*func_) (unsigned char *data_, size_t size_,
- void *arg_), void *arg_);
-
- private:
-
- void apply_helper (
- unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_,
- void (*func_) (unsigned char *data_, size_t size_, void *arg_),
- void *arg_);
- bool is_redundant () const;
-
- uint32_t refcnt;
- unsigned char min;
- unsigned short count;
- unsigned short live_nodes;
- union {
- class trie_t *node;
- class trie_t **table;
- } next;
-
- trie_t (const trie_t&);
- const trie_t &operator = (const trie_t&);
- };
-
-}
-
-#endif
-
diff --git a/src/wire.hpp b/src/wire.hpp
index 6d1e11b..f840fce 100644
--- a/src/wire.hpp
+++ b/src/wire.hpp
@@ -23,6 +23,10 @@
#include "stdint.hpp"
+// Protocol-related constants.
+#define XS_CMD_SUBSCRIBE 1
+#define XS_CMD_UNSUBSCRIBE 2
+
namespace xs
{
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 255c063..fe0b9a7 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -21,20 +21,27 @@
#include <string.h>
+#include "../include/xs.h"
+
#include "xpub.hpp"
#include "pipe.hpp"
+#include "wire.hpp"
#include "err.hpp"
#include "msg.hpp"
xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
- more (false)
+ more (false),
+ tmp_filter_id (-1)
{
options.type = XS_XPUB;
}
xs::xpub_t::~xpub_t ()
{
+ // Deallocate all the filters.
+ for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
+ it->type->pf_destroy ((void*) (core_t*) this, it->instance);
}
void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
@@ -46,8 +53,27 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// to all data on this pipe, implicitly. Also, if we are using
// 0MQ/2.1-style protocol, there's no subscription forwarding. Thus,
// we need to subscribe for all messages automatically.
- if (icanhasall_ || pipe_->get_protocol () == 1)
- subscriptions.add (NULL, 0, pipe_);
+ if (icanhasall_|| pipe_->get_protocol () == 1) {
+
+ // Find the prefix filter.
+ // TODO: Change this to ALL filter.
+ filters_t::iterator it;
+ for (it = filters.begin (); it != filters.end (); ++it)
+ if (it->type->id (NULL) == XS_FILTER_PREFIX)
+ break;
+ if (it == filters.end ()) {
+ filter_t f;
+ f.type = get_filter (XS_FILTER_PREFIX);
+ xs_assert (f.type);
+ f.instance = f.type->pf_create ((void*) (core_t*) this);
+ xs_assert (f.instance);
+ filters.push_back (f);
+ it = filters.end () - 1;
+ }
+
+ it->type->pf_subscribe ((void*) (core_t*) this, it->instance, pipe_,
+ NULL, 0);
+ }
// The pipe is active when attached. Let's read the subscriptions from
// it, if any.
@@ -73,16 +99,72 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
// TODO: In the case of malformed subscription we will simply ignore
// it for now. However, we should close the connection instead.
- if (size <= 0 || (*data == 0 && *data == 1)) {
+ if (size < 4) {
+ sub.close ();
+ return;
+ }
+
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ if (size < 1) {
sub.close ();
return;
}
+#endif
+
+ int cmd = get_uint16 (data);
+ int filter_id = get_uint16 (data + 2);
+
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ int cmd = data [0] ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE;
+ int filter_id = XS_FILTER_PREFIX;
+#endif
+
+ if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
+ sub.close ();
+ return;
+ }
+
+ // Find the relevant filter.
+ filters_t::iterator it;
+ for (it = filters.begin (); it != filters.end (); ++it)
+ if (it->type->id (NULL) == filter_id)
+ break;
bool unique;
- if (*data == 0)
- unique = subscriptions.rm (data + 1, size - 1, pipe_);
- else
- unique = subscriptions.add (data + 1, size - 1, pipe_);
+ if (cmd == XS_CMD_UNSUBSCRIBE) {
+ xs_assert (it != filters.end ());
+ unique = it->type->pf_unsubscribe ((void*) (core_t*) this,
+ it->instance, pipe_, data + 4, size - 4) ? true : false;
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ unique = it->type->pf_unsubscribe ((void*) (core_t*) this,
+ it->instance, pipe_, data + 1, size - 1) ? true : false;
+#endif
+ }
+ else {
+
+ // If the filter of the specified type does not exist yet,
+ // create it.
+ if (it == filters.end ()) {
+ filter_t f;
+ f.type = get_filter (filter_id);
+ xs_assert (f.type);
+ f.instance = f.type->pf_create ((void*) (core_t*) this);
+ xs_assert (f.instance);
+ filters.push_back (f);
+ it = filters.end () - 1;
+ }
+
+ unique = it->type->pf_subscribe ((void*) (core_t*) this,
+ it->instance, pipe_, data + 4, size - 4) ? true : false;
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ unique = it->type->pf_subscribe ((void*) (core_t*) this,
+ it->instance, pipe_, data + 1, size - 1) ? true : false;
+#endif
+ }
// If the subscription is not a duplicate store it so that it can be
// passed to used on next recv call.
@@ -100,28 +182,29 @@ void xs::xpub_t::xwrite_activated (pipe_t *pipe_)
void xs::xpub_t::xterminated (pipe_t *pipe_)
{
- // Remove the pipe from the trie. If there are topics that nobody
- // is interested in anymore, send corresponding unsubscriptions
- // upstream.
- subscriptions.rm (pipe_, send_unsubscription, this);
+ // Remove the pipe from all the filters.
+ for (filters_t::iterator it = filters.begin (); it != filters.end ();
+ ++it) {
+ tmp_filter_id = it->type->id (NULL);
+ it->type->pf_unsubscribe_all ((void*) (core_t*) this, it->instance,
+ (void*) pipe_);
+ tmp_filter_id = -1;
+ }
dist.terminated (pipe_);
}
-void xs::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
-{
- xpub_t *self = (xpub_t*) arg_;
- self->dist.match (pipe_);
-}
-
int xs::xpub_t::xsend (msg_t *msg_, int flags_)
{
bool msg_more = msg_->flags () & msg_t::more ? true : false;
// For the first part of multi-part message, find the matching pipes.
- if (!more)
- subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
- mark_as_matching, this);
+ if (!more) {
+ for (filters_t::iterator it = filters.begin (); it != filters.end ();
+ ++it)
+ it->type->pf_match ((void*) (core_t*) this, it->instance,
+ (unsigned char*) msg_->data (), msg_->size ());
+ }
// Send the message to all the pipes that were marked as matching
// in the previous step.
@@ -167,21 +250,35 @@ bool xs::xpub_t::xhas_in ()
return !pending.empty ();
}
-void xs::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
- void *arg_)
+int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
{
- xpub_t *self = (xpub_t*) arg_;
-
- if (self->options.type != XS_PUB) {
+ // In XS_PUB socket, the subscriptions are not passed upstream.
+ if (options.type != XS_PUB) {
// Place the unsubscription to the queue of pending (un)sunscriptions
// to be retrived by the user later on.
- xpub_t *self = (xpub_t*) arg_;
+ blob_t unsub (size_ + 4, 0);
+ put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE);
+ put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id);
+ memcpy ((void*) (unsub.data () + 4), data_, size_);
+
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
blob_t unsub (size_ + 1, 0);
unsub [0] = 0;
memcpy ((void*) (unsub.data () + 1), data_, size_);
- self->pending.push_back (unsub);
+#endif
+
+ pending.push_back (unsub);
}
+
+ return 0;
+}
+
+int xs::xpub_t::filter_matching (void *subscriber_)
+{
+ dist.match ((xs::pipe_t*) subscriber_);
+ return 0;
}
xs::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_,
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 7f29dcc..c0e47ff 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -21,15 +21,17 @@
#ifndef __XS_XPUB_HPP_INCLUDED__
#define __XS_XPUB_HPP_INCLUDED__
+#include "../include/xs.h"
+
#include <deque>
#include <string>
#include "socket_base.hpp"
#include "session_base.hpp"
-#include "mtrie.hpp"
#include "array.hpp"
#include "dist.hpp"
#include "blob.hpp"
+#include "core.hpp"
namespace xs
{
@@ -39,8 +41,7 @@ namespace xs
class pipe_t;
class io_thread_t;
- class xpub_t :
- public socket_base_t
+ class xpub_t : public socket_base_t, public core_t
{
public:
@@ -59,16 +60,18 @@ namespace xs
private:
- // Function to be applied to the trie to send all the subsciptions
- // upstream.
- static void send_unsubscription (unsigned char *data_, size_t size_,
- void *arg_);
-
- // Function to be applied to each matching pipes.
- static void mark_as_matching (xs::pipe_t *pipe_, void *arg_);
+ // Overloaded functions from core_t.
+ int filter_unsubscribed (const unsigned char *data_, size_t size_);
+ int filter_matching (void *subscriber_);
- // List of all subscriptions mapped to corresponding pipes.
- mtrie_t subscriptions;
+ // The repository of subscriptions.
+ struct filter_t
+ {
+ xs_filter_t *type;
+ void *instance;
+ };
+ typedef std::vector <filter_t> filters_t;
+ filters_t filters;
// Distributor of messages holding the list of outbound pipes.
dist_t dist;
@@ -81,6 +84,9 @@ namespace xs
typedef std::deque <blob_t> pending_t;
pending_t pending;
+ // Different values stored while filter extensions are being executed.
+ int tmp_filter_id;
+
xpub_t (const xpub_t&);
const xpub_t &operator = (const xpub_t&);
};
diff --git a/src/xsub.cpp b/src/xsub.cpp
index af6789f..da56586 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -23,11 +23,14 @@
#include "xsub.hpp"
#include "err.hpp"
+#include "wire.hpp"
xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
has_message (false),
- more (false)
+ more (false),
+ tmp_pipe (NULL),
+ tmp_filter_id (-1)
{
options.type = XS_XSUB;
@@ -44,6 +47,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xs::xsub_t::~xsub_t ()
{
+ // Deallocate all the filters.
+ for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
+ it->type->sf_destroy ((void*) (core_t*) this, it->instance);
+
int rc = message.close ();
errno_assert (rc == 0);
}
@@ -59,8 +66,15 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
dist.attach (pipe_);
// Send all the cached subscriptions to the new upstream peer.
- subscriptions.apply (send_subscription, pipe_);
+ tmp_pipe = pipe_;
+ for (filters_t::iterator it = filters.begin (); it != filters.end ();
+ ++it) {
+ tmp_filter_id = it->type->id (NULL);
+ it->type->sf_enumerate ((void*) (core_t*) this, it->instance);
+ tmp_filter_id = -1;
+ }
pipe_->flush ();
+ tmp_pipe = NULL;
}
void xs::xsub_t::xread_activated (pipe_t *pipe_)
@@ -82,11 +96,17 @@ void xs::xsub_t::xterminated (pipe_t *pipe_)
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
+ // Send all the cached subscriptions to the hiccuped pipe.
if (pipe_->get_protocol () != 1) {
-
- // Send all the cached subscriptions to the hiccuped pipe.
- subscriptions.apply (send_subscription, pipe_);
+ tmp_pipe = pipe_;
+ for (filters_t::iterator it = filters.begin (); it != filters.end ();
+ ++it) {
+ tmp_filter_id = it->type->id (NULL);
+ it->type->sf_enumerate ((void*) (core_t*) this, it->instance);
+ tmp_filter_id = -1;
+ }
pipe_->flush ();
+ tmp_pipe = NULL;
}
}
@@ -95,21 +115,69 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
size_t size = msg_->size ();
unsigned char *data = (unsigned char*) msg_->data ();
- // Malformed subscriptions.
- if (size < 1 || (*data != 0 && *data != 1)) {
+ if (size < 4) {
+ errno = EINVAL;
+ return -1;
+ }
+ int cmd = get_uint16 (data);
+ int filter_id = get_uint16 (data + 2);
+
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ if (size < 1) {
+ errno = EINVAL;
+ return -1;
+ }
+ int cmd = data [0] ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE;
+ int filter_id = XS_FILTER_PREFIX;
+#endif
+
+ if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
errno = EINVAL;
return -1;
}
+
+ // Find the relevant filter.
+ filters_t::iterator it;
+ for (it = filters.begin (); it != filters.end (); ++it)
+ if (it->type->id (NULL) == filter_id)
+ break;
+
+ // Process the subscription.
+ if (cmd == XS_CMD_SUBSCRIBE) {
+
+ // If the filter of the specified type does not exist yet, create it.
+ if (it == filters.end ()) {
+ filter_t f;
+ f.type = get_filter (filter_id);
+ xs_assert (f.type);
+ f.instance = f.type->sf_create ((void*) (core_t*) this);
+ xs_assert (f.instance);
+ filters.push_back (f);
+ it = filters.end () - 1;
+ }
- // Process the subscription.
- if (*data == 1) {
- if (subscriptions.add (data + 1, size - 1))
+ if (it->type->sf_subscribe ((void*) (core_t*) this,
+ it->instance, data + 4, size - 4) == 1)
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ if (it->type->sf_subscribe ((void*) (core_t*) this,
+ it->instance, data + 1, size - 1) == 1)
+#endif
return dist.send_to_all (msg_, flags_);
else
return 0;
}
- else if (*data == 0) {
- if (subscriptions.rm (data + 1, size - 1))
+ else if (cmd == XS_CMD_UNSUBSCRIBE) {
+ xs_assert (it != filters.end ());
+
+ if (it->type->sf_unsubscribe ((void*) (core_t*) this,
+ it->instance, data + 4, size - 4) == 1)
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ if (it->type->sf_unsubscribe ((void*) (core_t*) this,
+ it->instance, data + 1, size - 1) == 1)
+#endif
return dist.send_to_all (msg_, flags_);
else
return 0;
@@ -208,24 +276,37 @@ bool xs::xsub_t::xhas_in ()
bool xs::xsub_t::match (msg_t *msg_)
{
- return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
+ for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it)
+ if (it->type->sf_match ((void*) (core_t*) this, it->instance,
+ (unsigned char*) msg_->data (), msg_->size ()))
+ return true;
+ return false;
}
-void xs::xsub_t::send_subscription (unsigned char *data_, size_t size_,
- void *arg_)
+int xs::xsub_t::filter_subscribed (const unsigned char *data_, size_t size_)
{
- pipe_t *pipe = (pipe_t*) arg_;
-
// Create the subsctription message.
msg_t msg;
+ int rc = msg.init_size (size_ + 4);
+ xs_assert (rc == 0);
+ unsigned char *data = (unsigned char*) msg.data ();
+ put_uint16 (data, XS_CMD_SUBSCRIBE);
+ put_uint16 (data + 2, tmp_filter_id);
+ memcpy (data + 4, data_, size_);
+
+#if 0
+ // TODO: This is 0MQ/3.1 protocol.
+ xs_assert (tmp_filter_id == XS_FILTER_PREFIX);
+ msg_t msg;
int rc = msg.init_size (size_ + 1);
xs_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
data [0] = 1;
memcpy (data + 1, data_, size_);
+#endif
// Send it to the pipe.
- bool sent = pipe->write (&msg);
+ bool sent = tmp_pipe->write (&msg);
// If we reached the SNDHWM, and thus cannot send the subscription, drop
// the subscription message instead. This matches the behaviour of
@@ -233,6 +314,8 @@ void xs::xsub_t::send_subscription (unsigned char *data_, size_t size_,
// when the SNDHWM is reached.
if (!sent)
msg.close ();
+
+ return 0;
}
xs::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_,
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 07fe026..4621570 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -21,11 +21,15 @@
#ifndef __XS_XSUB_HPP_INCLUDED__
#define __XS_XSUB_HPP_INCLUDED__
+#include <vector>
+
+#include "../include/xs.h"
+
#include "socket_base.hpp"
#include "session_base.hpp"
#include "dist.hpp"
+#include "core.hpp"
#include "fq.hpp"
-#include "trie.hpp"
namespace xs
{
@@ -34,8 +38,7 @@ namespace xs
class pipe_t;
class io_thread_t;
- class xsub_t :
- public socket_base_t
+ class xsub_t : public socket_base_t, public core_t
{
public:
@@ -57,14 +60,12 @@ namespace xs
private:
+ // Overloads from core_t class.
+ int filter_subscribed (const unsigned char *data_, size_t size_);
+
// Check whether the message matches at least one subscription.
bool match (xs::msg_t *msg_);
- // Function to be applied to the trie to send all the subsciptions
- // upstream.
- static void send_subscription (unsigned char *data_, size_t size_,
- void *arg_);
-
// Fair queueing object for inbound pipes.
fq_t fq;
@@ -72,7 +73,13 @@ namespace xs
dist_t dist;
// The repository of subscriptions.
- trie_t subscriptions;
+ struct filter_t
+ {
+ xs_filter_t *type;
+ void *instance;
+ };
+ typedef std::vector <filter_t> filters_t;
+ filters_t filters;
// If true, 'message' contains a matching message to return on the
// next recv call.
@@ -83,6 +90,10 @@ namespace xs
// there are following parts still waiting.
bool more;
+ // Different values stored while filter extensions are being executed.
+ pipe_t *tmp_pipe;
+ int tmp_filter_id;
+
xsub_t (const xsub_t&);
const xsub_t &operator = (const xsub_t&);
};