summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/xs/xs.h1
-rw-r--r--src/Makefile.am2
-rw-r--r--src/ctx.cpp4
-rw-r--r--src/prefix_filter.cpp2
-rw-r--r--src/topic_filter.cpp221
-rw-r--r--src/topic_filter.hpp32
6 files changed, 260 insertions, 2 deletions
diff --git a/include/xs/xs.h b/include/xs/xs.h
index 9b09812..120bd1f 100644
--- a/include/xs/xs.h
+++ b/include/xs/xs.h
@@ -278,6 +278,7 @@ XS_EXPORT unsigned long xs_stopwatch_stop (void *watch);
#define XS_FILTER_ALL 0
#define XS_FILTER_PREFIX 1
+#define XS_FILTER_TOPIC 2
typedef struct
{
diff --git a/src/Makefile.am b/src/Makefile.am
index 8c3de7c..1daee23 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -77,6 +77,7 @@ libxs_la_SOURCES = \
tcp_connecter.hpp \
tcp_listener.hpp \
thread.hpp \
+ topic_filter.hpp \
upoll.hpp \
windows.hpp \
wire.hpp \
@@ -136,6 +137,7 @@ libxs_la_SOURCES = \
tcp_connecter.cpp \
tcp_listener.cpp \
thread.cpp \
+ topic_filter.cpp \
upoll.cpp \
xpub.cpp \
xrep.cpp \
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 7f4c88a..6460dd2 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -56,6 +56,7 @@
#include "err.hpp"
#include "msg.hpp"
#include "prefix_filter.hpp"
+#include "topic_filter.hpp"
xs::ctx_t::ctx_t () :
tag (0xbadcafe0),
@@ -73,7 +74,8 @@ xs::ctx_t::ctx_t () :
// Plug in the standard plugins.
rc = plug (prefix_filter);
errno_assert (rc == 0);
-
+ rc = plug (topic_filter);
+ errno_assert (rc == 0);
// Now plug in all the extensions found in plugin directory.
diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp
index 06a77f5..abf68d0 100644
--- a/src/prefix_filter.cpp
+++ b/src/prefix_filter.cpp
@@ -594,7 +594,7 @@ static xs_filter_t pfx_filter = {
sf_destroy,
sf_subscribe,
sf_unsubscribe,
- sf_match,
+ sf_match
};
void *xs::prefix_filter = (void*) &pfx_filter;
diff --git a/src/topic_filter.cpp b/src/topic_filter.cpp
new file mode 100644
index 0000000..8fda252
--- /dev/null
+++ b/src/topic_filter.cpp
@@ -0,0 +1,221 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <vector>
+#include <string>
+#include <map>
+
+#include "../include/xs/xs.h"
+
+#include "topic_filter.hpp"
+#include "err.hpp"
+
+typedef std::vector <void*> subscribers_t;
+typedef std::map <std::string, subscribers_t> topic_t;
+
+static bool topic_match (const char *topic_,
+ const unsigned char *data_, size_t size_)
+{
+ while (true) {
+
+ // Check whether matching is done.
+ if (*topic_ == 0)
+ return true;
+
+ // Match one element.
+ if (topic_ [0] == '*') {
+
+ // March wildcard element.
+ ++topic_;
+ while (size_ && *data_ != 0 && *data_ != '.')
+ ++data_, --size_;
+ }
+ else {
+
+ // Match literal element.
+ while (true) {
+ if (topic_ [0] == '.' || topic_ [0] == 0)
+ break;
+ if (!size_)
+ return false;
+ if (topic_ [0] != *data_)
+ return false;
+ ++data_;
+ --size_;
+ ++topic_;
+ }
+ }
+
+ // Check whether matching is done.
+ if (topic_ [0] == 0)
+ return true;
+
+ // Match dot.
+ if (topic_ [0] != '.')
+ return false; // Malformed subscription, e.g. "*abc"
+ if (!size_)
+ return false;
+ if (*data_ != '.')
+ return false;
+ ++data_;
+ --size_;
+ ++topic_;
+ }
+}
+
+static int id (void *core_)
+{
+ return XS_FILTER_TOPIC;
+}
+
+static void *pf_create (void *core_)
+{
+ topic_t *pf = new (std::nothrow) topic_t;
+ alloc_assert (pf);
+ return (void*) pf;
+}
+
+static void pf_destroy (void *core_, void *pf_)
+{
+ xs_assert (pf_);
+ delete (topic_t*) pf_;
+}
+
+static int pf_subscribe (void *core_, void *pf_, void *subscriber_,
+ const unsigned char *data_, size_t size_)
+{
+ topic_t *self = (topic_t*) pf_;
+ (*self) [std::string ((char*) data_, size_)].push_back (subscriber_);
+ return xs_filter_subscribed (core_, data_, size_);
+}
+
+static int pf_unsubscribe (void *core_, void *pf_, void *subscriber_,
+ const unsigned char *data_, size_t size_)
+{
+ topic_t *self = (topic_t*) pf_;
+
+ topic_t::iterator it = self->find (std::string ((char*) data_, size_));
+ if (it == self->end ()) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ bool found = false;
+ for (subscribers_t::iterator its = it->second.begin ();
+ its != it->second.end (); ++its) {
+ if (*its == subscriber_) {
+ it->second.erase (its);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (it->second.empty ())
+ self->erase (it);
+
+ return 0;
+}
+
+static void pf_unsubscribe_all (void *core_, void *pf_, void *subscriber_)
+{
+ topic_t *self = (topic_t*) pf_;
+
+ for (topic_t::iterator it = self->begin (); it != self->end ();) {
+ for (subscribers_t::size_type i = 0; i < it->second.size (); ++i) {
+ if (it->second [i] == subscriber_)
+ it->second.erase (it->second.begin () + i--);
+ }
+ if (it->second.empty ())
+ self->erase (it++);
+ else
+ ++it;
+ }
+}
+
+static void pf_match (void *core_, void *pf_,
+ const unsigned char *data_, size_t size_)
+{
+ topic_t *self = (topic_t*) pf_;
+ for (topic_t::iterator it = self->begin (); it != self->end (); ++it) {
+ if (topic_match (it->first.c_str (), data_, size_)) {
+ for (subscribers_t::iterator its = it->second.begin ();
+ its != it->second.end (); ++its) {
+ int rc = xs_filter_matching (core_, *its);
+ errno_assert (rc == 0);
+ }
+ }
+ }
+}
+
+static void *sf_create (void *core_)
+{
+ return pf_create (core_);
+}
+
+static void sf_destroy (void *core_, void *sf_)
+{
+ pf_destroy (core_, sf_);
+}
+
+static int sf_subscribe (void *core_, void *sf_,
+ const unsigned char *data_, size_t size_)
+{
+ return pf_subscribe (core_, sf_, NULL, data_, size_);
+}
+
+static int sf_unsubscribe (void *core_, void *sf_,
+ const unsigned char *data_, size_t size_)
+{
+ return pf_unsubscribe (core_, sf_, NULL, data_, size_);
+}
+
+static int sf_match (void *core_, void *sf_,
+ const unsigned char *data_, size_t size_)
+{
+ topic_t *self = (topic_t*) sf_;
+ for (topic_t::iterator it = self->begin (); it != self->end (); ++it)
+ if (topic_match (it->first.c_str (), data_, size_))
+ return 1;
+ return 0;
+}
+
+static xs_filter_t rgxp_filter = {
+ XS_PLUGIN_FILTER,
+ 1,
+ id,
+ pf_create,
+ pf_destroy,
+ pf_subscribe,
+ pf_unsubscribe,
+ pf_unsubscribe_all,
+ pf_match,
+ sf_create,
+ sf_destroy,
+ sf_subscribe,
+ sf_unsubscribe,
+ sf_match
+};
+
+void *xs::topic_filter = (void*) &rgxp_filter;
+
diff --git a/src/topic_filter.hpp b/src/topic_filter.hpp
new file mode 100644
index 0000000..bc81675
--- /dev/null
+++ b/src/topic_filter.hpp
@@ -0,0 +1,32 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_TOPIC_FILTER_HPP_INCLUDED__
+#define __XS_TOPIC_FILTER_HPP_INCLUDED__
+
+namespace xs
+{
+
+ // Canonical extension object.
+ extern void *topic_filter;
+
+}
+
+#endif