diff options
-rw-r--r-- | include/xs/xs.h | 1 | ||||
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/ctx.cpp | 4 | ||||
-rw-r--r-- | src/prefix_filter.cpp | 2 | ||||
-rw-r--r-- | src/topic_filter.cpp | 221 | ||||
-rw-r--r-- | src/topic_filter.hpp | 32 |
6 files changed, 260 insertions, 2 deletions
diff --git a/include/xs/xs.h b/include/xs/xs.h index 9b09812..120bd1f 100644 --- a/include/xs/xs.h +++ b/include/xs/xs.h @@ -278,6 +278,7 @@ XS_EXPORT unsigned long xs_stopwatch_stop (void *watch); #define XS_FILTER_ALL 0 #define XS_FILTER_PREFIX 1 +#define XS_FILTER_TOPIC 2 typedef struct { diff --git a/src/Makefile.am b/src/Makefile.am index 8c3de7c..1daee23 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -77,6 +77,7 @@ libxs_la_SOURCES = \ tcp_connecter.hpp \ tcp_listener.hpp \ thread.hpp \ + topic_filter.hpp \ upoll.hpp \ windows.hpp \ wire.hpp \ @@ -136,6 +137,7 @@ libxs_la_SOURCES = \ tcp_connecter.cpp \ tcp_listener.cpp \ thread.cpp \ + topic_filter.cpp \ upoll.cpp \ xpub.cpp \ xrep.cpp \ diff --git a/src/ctx.cpp b/src/ctx.cpp index 7f4c88a..6460dd2 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -56,6 +56,7 @@ #include "err.hpp" #include "msg.hpp" #include "prefix_filter.hpp" +#include "topic_filter.hpp" xs::ctx_t::ctx_t () : tag (0xbadcafe0), @@ -73,7 +74,8 @@ xs::ctx_t::ctx_t () : // Plug in the standard plugins. rc = plug (prefix_filter); errno_assert (rc == 0); - + rc = plug (topic_filter); + errno_assert (rc == 0); // Now plug in all the extensions found in plugin directory. diff --git a/src/prefix_filter.cpp b/src/prefix_filter.cpp index 06a77f5..abf68d0 100644 --- a/src/prefix_filter.cpp +++ b/src/prefix_filter.cpp @@ -594,7 +594,7 @@ static xs_filter_t pfx_filter = { sf_destroy, sf_subscribe, sf_unsubscribe, - sf_match, + sf_match }; void *xs::prefix_filter = (void*) &pfx_filter; diff --git a/src/topic_filter.cpp b/src/topic_filter.cpp new file mode 100644 index 0000000..8fda252 --- /dev/null +++ b/src/topic_filter.cpp @@ -0,0 +1,221 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <vector> +#include <string> +#include <map> + +#include "../include/xs/xs.h" + +#include "topic_filter.hpp" +#include "err.hpp" + +typedef std::vector <void*> subscribers_t; +typedef std::map <std::string, subscribers_t> topic_t; + +static bool topic_match (const char *topic_, + const unsigned char *data_, size_t size_) +{ + while (true) { + + // Check whether matching is done. + if (*topic_ == 0) + return true; + + // Match one element. + if (topic_ [0] == '*') { + + // March wildcard element. + ++topic_; + while (size_ && *data_ != 0 && *data_ != '.') + ++data_, --size_; + } + else { + + // Match literal element. + while (true) { + if (topic_ [0] == '.' || topic_ [0] == 0) + break; + if (!size_) + return false; + if (topic_ [0] != *data_) + return false; + ++data_; + --size_; + ++topic_; + } + } + + // Check whether matching is done. + if (topic_ [0] == 0) + return true; + + // Match dot. + if (topic_ [0] != '.') + return false; // Malformed subscription, e.g. "*abc" + if (!size_) + return false; + if (*data_ != '.') + return false; + ++data_; + --size_; + ++topic_; + } +} + +static int id (void *core_) +{ + return XS_FILTER_TOPIC; +} + +static void *pf_create (void *core_) +{ + topic_t *pf = new (std::nothrow) topic_t; + alloc_assert (pf); + return (void*) pf; +} + +static void pf_destroy (void *core_, void *pf_) +{ + xs_assert (pf_); + delete (topic_t*) pf_; +} + +static int pf_subscribe (void *core_, void *pf_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + topic_t *self = (topic_t*) pf_; + (*self) [std::string ((char*) data_, size_)].push_back (subscriber_); + return xs_filter_subscribed (core_, data_, size_); +} + +static int pf_unsubscribe (void *core_, void *pf_, void *subscriber_, + const unsigned char *data_, size_t size_) +{ + topic_t *self = (topic_t*) pf_; + + topic_t::iterator it = self->find (std::string ((char*) data_, size_)); + if (it == self->end ()) { + errno = EINVAL; + return -1; + } + + bool found = false; + for (subscribers_t::iterator its = it->second.begin (); + its != it->second.end (); ++its) { + if (*its == subscriber_) { + it->second.erase (its); + found = true; + break; + } + } + if (!found) { + errno = EINVAL; + return -1; + } + + if (it->second.empty ()) + self->erase (it); + + return 0; +} + +static void pf_unsubscribe_all (void *core_, void *pf_, void *subscriber_) +{ + topic_t *self = (topic_t*) pf_; + + for (topic_t::iterator it = self->begin (); it != self->end ();) { + for (subscribers_t::size_type i = 0; i < it->second.size (); ++i) { + if (it->second [i] == subscriber_) + it->second.erase (it->second.begin () + i--); + } + if (it->second.empty ()) + self->erase (it++); + else + ++it; + } +} + +static void pf_match (void *core_, void *pf_, + const unsigned char *data_, size_t size_) +{ + topic_t *self = (topic_t*) pf_; + for (topic_t::iterator it = self->begin (); it != self->end (); ++it) { + if (topic_match (it->first.c_str (), data_, size_)) { + for (subscribers_t::iterator its = it->second.begin (); + its != it->second.end (); ++its) { + int rc = xs_filter_matching (core_, *its); + errno_assert (rc == 0); + } + } + } +} + +static void *sf_create (void *core_) +{ + return pf_create (core_); +} + +static void sf_destroy (void *core_, void *sf_) +{ + pf_destroy (core_, sf_); +} + +static int sf_subscribe (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return pf_subscribe (core_, sf_, NULL, data_, size_); +} + +static int sf_unsubscribe (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + return pf_unsubscribe (core_, sf_, NULL, data_, size_); +} + +static int sf_match (void *core_, void *sf_, + const unsigned char *data_, size_t size_) +{ + topic_t *self = (topic_t*) sf_; + for (topic_t::iterator it = self->begin (); it != self->end (); ++it) + if (topic_match (it->first.c_str (), data_, size_)) + return 1; + return 0; +} + +static xs_filter_t rgxp_filter = { + XS_PLUGIN_FILTER, + 1, + id, + pf_create, + pf_destroy, + pf_subscribe, + pf_unsubscribe, + pf_unsubscribe_all, + pf_match, + sf_create, + sf_destroy, + sf_subscribe, + sf_unsubscribe, + sf_match +}; + +void *xs::topic_filter = (void*) &rgxp_filter; + diff --git a/src/topic_filter.hpp b/src/topic_filter.hpp new file mode 100644 index 0000000..bc81675 --- /dev/null +++ b/src/topic_filter.hpp @@ -0,0 +1,32 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_TOPIC_FILTER_HPP_INCLUDED__ +#define __XS_TOPIC_FILTER_HPP_INCLUDED__ + +namespace xs +{ + + // Canonical extension object. + extern void *topic_filter; + +} + +#endif |