From b3f32e219ec15a582b6fd03b155e0861443690f6 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 10 Sep 2009 12:00:47 +0200 Subject: ZMQII-3: cheap and nasty implementation of message filtering --- src/Makefile.am | 2 ++ src/app_thread.cpp | 18 ++++++++++-- src/socket_base.cpp | 5 ++++ src/socket_base.hpp | 2 +- src/sub.cpp | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/sub.hpp | 51 +++++++++++++++++++++++++++++++++ 6 files changed, 157 insertions(+), 3 deletions(-) create mode 100644 src/sub.cpp create mode 100644 src/sub.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 83670f2..cc71eba 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -53,6 +53,7 @@ libzmq_la_SOURCES = \ simple_semaphore.hpp \ socket_base.hpp \ stdint.hpp \ + sub.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ @@ -88,6 +89,7 @@ libzmq_la_SOURCES = \ select.cpp \ session.cpp \ socket_base.cpp \ + sub.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ tcp_socket.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index bdceca5..2bcc724 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "config.hpp" #include "socket_base.hpp" +#include "sub.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -135,8 +136,21 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) { - // TODO: type is ignored for the time being. - socket_base_t *s = new socket_base_t (this); + socket_base_t *s = NULL; + switch (type_) { + case ZMQ_SUB: + s = new sub_t (this); + break; + case ZMQ_P2P: + case ZMQ_PUB: + case ZMQ_REQ: + case ZMQ_REP: + s = new socket_base_t (this); + break; + default: + // TODO: This should be EINVAL. + zmq_assert (false); + } zmq_assert (s); s->set_index (sockets.size ()); sockets.push_back (s); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ac5a88c..a26c280 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -140,6 +140,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, options.identity.assign ((const char*) optval_, optvallen_); return 0; + case ZMQ_SUBSCRIBE: + case ZMQ_UNSUBSCRIBE: + errno = ENOTSUP; + return -1; + default: errno = EINVAL; return -1; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 3f5774f..5711dbe 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -39,7 +39,7 @@ namespace zmq public: socket_base_t (class app_thread_t *parent_); - ~socket_base_t (); + virtual ~socket_base_t (); // Interface for communication with the API layer. virtual int setsockopt (int option_, const void *optval_, diff --git a/src/sub.cpp b/src/sub.cpp new file mode 100644 index 0000000..954eb87 --- /dev/null +++ b/src/sub.cpp @@ -0,0 +1,82 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../c/zmq.h" + +#include "sub.hpp" +#include "err.hpp" + +zmq::sub_t::sub_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ +} + +zmq::sub_t::~sub_t () +{ +} + +int zmq::sub_t::setsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ == ZMQ_SUBSCRIBE) { + std::string subscription ((const char*) optval_, optvallen_); + subscriptions.insert (subscription); + return 0; + } + + if (option_ == ZMQ_UNSUBSCRIBE) { + std::string subscription ((const char*) optval_, optvallen_); + subscriptions_t::iterator it = subscriptions.find (subscription); + if (it == subscriptions.end ()) { + errno = EINVAL; + return -1; + } + subscriptions.erase (it); + return 0; + } + + return socket_base_t::setsockopt (option_, optval_, optvallen_); +} + +int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) +{ + while (true) { + + // Get a message. + int rc = socket_base_t::recv (msg_, flags_); + + // If there's no message available, return immediately. + if (rc != 0 && errno == EAGAIN) + return -1; + + // Check the message format. + // TODO: We should either ignore the message or drop the connection + // if the message doesn't conform with the expected format. + unsigned char *data = (unsigned char*) zmq_msg_data (msg_); + zmq_assert (*data <= zmq_msg_size (msg_) - 1); + + // Check whether the message matches at least one subscription. + std::string topic ((const char*) (data + 1), *data); + subscriptions_t::iterator it = subscriptions.find (topic); + if (it != subscriptions.end ()) + break; + } + + return 0; +} diff --git a/src/sub.hpp b/src/sub.hpp new file mode 100644 index 0000000..1d4fdf9 --- /dev/null +++ b/src/sub.hpp @@ -0,0 +1,51 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_SUB_INCLUDED__ +#define __ZMQ_SUB_INCLUDED__ + +#include +#include + +#include "socket_base.hpp" + +namespace zmq +{ + + class sub_t : public socket_base_t + { + public: + + sub_t (class app_thread_t *parent_); + ~sub_t (); + + // Overloads of API functions from socket_base_t. + int setsockopt (int option_, const void *optval_, size_t optvallen_); + int recv (struct zmq_msg_t *msg_, int flags_); + + private: + + // List of all the active subscriptions. + typedef std::multiset subscriptions_t; + subscriptions_t subscriptions; + }; + +} + +#endif -- cgit v1.2.3