summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-09-10 12:00:47 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-09-10 12:00:47 +0200
commitb3f32e219ec15a582b6fd03b155e0861443690f6 (patch)
treeb895af0213323bf879e87f6eaab87656eb170392
parentd81d3412520c9fed36f39462840c7d35815f4a87 (diff)
ZMQII-3: cheap and nasty implementation of message filtering
-rw-r--r--c/zmq.h2
-rw-r--r--src/Makefile.am2
-rw-r--r--src/app_thread.cpp18
-rw-r--r--src/socket_base.cpp5
-rw-r--r--src/socket_base.hpp2
-rw-r--r--src/sub.cpp82
-rw-r--r--src/sub.hpp51
7 files changed, 159 insertions, 3 deletions
diff --git a/c/zmq.h b/c/zmq.h
index cc165d1..df6e04c 100644
--- a/c/zmq.h
+++ b/c/zmq.h
@@ -50,6 +50,8 @@ extern "C" {
#define ZMQ_MASK 4
#define ZMQ_AFFINITY 5
#define ZMQ_IDENTITY 6
+#define ZMQ_SUBSCRIBE 7
+#define ZMQ_UNSUBSCRIBE 8
// The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN.
diff --git a/src/Makefile.am b/src/Makefile.am
index 83670f2..cc71eba 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -53,6 +53,7 @@ libzmq_la_SOURCES = \
simple_semaphore.hpp \
socket_base.hpp \
stdint.hpp \
+ sub.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
@@ -88,6 +89,7 @@ libzmq_la_SOURCES = \
select.cpp \
session.cpp \
socket_base.cpp \
+ sub.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
tcp_socket.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index bdceca5..2bcc724 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -35,6 +35,7 @@
#include "pipe.hpp"
#include "config.hpp"
#include "socket_base.hpp"
+#include "sub.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -135,8 +136,21 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{
- // TODO: type is ignored for the time being.
- socket_base_t *s = new socket_base_t (this);
+ socket_base_t *s = NULL;
+ switch (type_) {
+ case ZMQ_SUB:
+ s = new sub_t (this);
+ break;
+ case ZMQ_P2P:
+ case ZMQ_PUB:
+ case ZMQ_REQ:
+ case ZMQ_REP:
+ s = new socket_base_t (this);
+ break;
+ default:
+ // TODO: This should be EINVAL.
+ zmq_assert (false);
+ }
zmq_assert (s);
s->set_index (sockets.size ());
sockets.push_back (s);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ac5a88c..a26c280 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -140,6 +140,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
options.identity.assign ((const char*) optval_, optvallen_);
return 0;
+ case ZMQ_SUBSCRIBE:
+ case ZMQ_UNSUBSCRIBE:
+ errno = ENOTSUP;
+ return -1;
+
default:
errno = EINVAL;
return -1;
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 3f5774f..5711dbe 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -39,7 +39,7 @@ namespace zmq
public:
socket_base_t (class app_thread_t *parent_);
- ~socket_base_t ();
+ virtual ~socket_base_t ();
// Interface for communication with the API layer.
virtual int setsockopt (int option_, const void *optval_,
diff --git a/src/sub.cpp b/src/sub.cpp
new file mode 100644
index 0000000..954eb87
--- /dev/null
+++ b/src/sub.cpp
@@ -0,0 +1,82 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../c/zmq.h"
+
+#include "sub.hpp"
+#include "err.hpp"
+
+zmq::sub_t::sub_t (class app_thread_t *parent_) :
+ socket_base_t (parent_)
+{
+}
+
+zmq::sub_t::~sub_t ()
+{
+}
+
+int zmq::sub_t::setsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ == ZMQ_SUBSCRIBE) {
+ std::string subscription ((const char*) optval_, optvallen_);
+ subscriptions.insert (subscription);
+ return 0;
+ }
+
+ if (option_ == ZMQ_UNSUBSCRIBE) {
+ std::string subscription ((const char*) optval_, optvallen_);
+ subscriptions_t::iterator it = subscriptions.find (subscription);
+ if (it == subscriptions.end ()) {
+ errno = EINVAL;
+ return -1;
+ }
+ subscriptions.erase (it);
+ return 0;
+ }
+
+ return socket_base_t::setsockopt (option_, optval_, optvallen_);
+}
+
+int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
+{
+ while (true) {
+
+ // Get a message.
+ int rc = socket_base_t::recv (msg_, flags_);
+
+ // If there's no message available, return immediately.
+ if (rc != 0 && errno == EAGAIN)
+ return -1;
+
+ // Check the message format.
+ // TODO: We should either ignore the message or drop the connection
+ // if the message doesn't conform with the expected format.
+ unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
+ zmq_assert (*data <= zmq_msg_size (msg_) - 1);
+
+ // Check whether the message matches at least one subscription.
+ std::string topic ((const char*) (data + 1), *data);
+ subscriptions_t::iterator it = subscriptions.find (topic);
+ if (it != subscriptions.end ())
+ break;
+ }
+
+ return 0;
+}
diff --git a/src/sub.hpp b/src/sub.hpp
new file mode 100644
index 0000000..1d4fdf9
--- /dev/null
+++ b/src/sub.hpp
@@ -0,0 +1,51 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_SUB_INCLUDED__
+#define __ZMQ_SUB_INCLUDED__
+
+#include <set>
+#include <string>
+
+#include "socket_base.hpp"
+
+namespace zmq
+{
+
+ class sub_t : public socket_base_t
+ {
+ public:
+
+ sub_t (class app_thread_t *parent_);
+ ~sub_t ();
+
+ // Overloads of API functions from socket_base_t.
+ int setsockopt (int option_, const void *optval_, size_t optvallen_);
+ int recv (struct zmq_msg_t *msg_, int flags_);
+
+ private:
+
+ // List of all the active subscriptions.
+ typedef std::multiset <std::string> subscriptions_t;
+ subscriptions_t subscriptions;
+ };
+
+}
+
+#endif