From b3f32e219ec15a582b6fd03b155e0861443690f6 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <sustrik@fastmq.commkdir>
Date: Thu, 10 Sep 2009 12:00:47 +0200
Subject: ZMQII-3: cheap and nasty implementation of message filtering

---
 c/zmq.h             |  2 ++
 src/Makefile.am     |  2 ++
 src/app_thread.cpp  | 18 ++++++++++--
 src/socket_base.cpp |  5 ++++
 src/socket_base.hpp |  2 +-
 src/sub.cpp         | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/sub.hpp         | 51 +++++++++++++++++++++++++++++++++
 7 files changed, 159 insertions(+), 3 deletions(-)
 create mode 100644 src/sub.cpp
 create mode 100644 src/sub.hpp

diff --git a/c/zmq.h b/c/zmq.h
index cc165d1..df6e04c 100644
--- a/c/zmq.h
+++ b/c/zmq.h
@@ -50,6 +50,8 @@ extern "C" {
 #define ZMQ_MASK 4
 #define ZMQ_AFFINITY 5
 #define ZMQ_IDENTITY 6
+#define ZMQ_SUBSCRIBE 7
+#define ZMQ_UNSUBSCRIBE 8
 
 //  The operation should be performed in non-blocking mode. I.e. if it cannot
 //  be processed immediately, error should be returned with errno set to EAGAIN.
diff --git a/src/Makefile.am b/src/Makefile.am
index 83670f2..cc71eba 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -53,6 +53,7 @@ libzmq_la_SOURCES = \
     simple_semaphore.hpp \
     socket_base.hpp \
     stdint.hpp \
+    sub.hpp \
     tcp_connecter.hpp \
     tcp_listener.hpp \
     tcp_socket.hpp \
@@ -88,6 +89,7 @@ libzmq_la_SOURCES = \
     select.cpp \
     session.cpp \
     socket_base.cpp \
+    sub.cpp \
     tcp_connecter.cpp \
     tcp_listener.cpp \
     tcp_socket.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index bdceca5..2bcc724 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -35,6 +35,7 @@
 #include "pipe.hpp"
 #include "config.hpp"
 #include "socket_base.hpp"
+#include "sub.hpp"
 
 //  If the RDTSC is available we use it to prevent excessive
 //  polling for commands. The nice thing here is that it will work on any
@@ -135,8 +136,21 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
 
 zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
 {
-    //  TODO: type is ignored for the time being.
-    socket_base_t *s = new socket_base_t (this);
+    socket_base_t *s = NULL;
+    switch (type_) {
+    case ZMQ_SUB:
+        s = new sub_t (this);
+        break;
+    case ZMQ_P2P:
+    case ZMQ_PUB:
+    case ZMQ_REQ:
+    case ZMQ_REP:
+        s = new socket_base_t (this);
+        break;
+    default:
+        //  TODO: This should be EINVAL.
+        zmq_assert (false);
+    }
     zmq_assert (s);
     s->set_index (sockets.size ());
     sockets.push_back (s);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ac5a88c..a26c280 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -140,6 +140,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
         options.identity.assign ((const char*) optval_, optvallen_);
         return 0;
 
+    case ZMQ_SUBSCRIBE:
+    case ZMQ_UNSUBSCRIBE:
+        errno = ENOTSUP;
+        return -1;
+
     default:
         errno = EINVAL;
         return -1;
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 3f5774f..5711dbe 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -39,7 +39,7 @@ namespace zmq
     public:
 
         socket_base_t (class app_thread_t *parent_);
-        ~socket_base_t ();
+        virtual ~socket_base_t ();
 
         //  Interface for communication with the API layer.
         virtual int setsockopt (int option_, const void *optval_,
diff --git a/src/sub.cpp b/src/sub.cpp
new file mode 100644
index 0000000..954eb87
--- /dev/null
+++ b/src/sub.cpp
@@ -0,0 +1,82 @@
+/*
+    Copyright (c) 2007-2009 FastMQ Inc.
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../c/zmq.h"
+
+#include "sub.hpp"
+#include "err.hpp"
+
+zmq::sub_t::sub_t (class app_thread_t *parent_) :
+    socket_base_t (parent_)
+{
+}
+
+zmq::sub_t::~sub_t ()
+{
+}
+
+int zmq::sub_t::setsockopt (int option_, const void *optval_,
+    size_t optvallen_)
+{
+    if (option_ == ZMQ_SUBSCRIBE) {
+        std::string subscription ((const char*) optval_, optvallen_);
+        subscriptions.insert (subscription);
+        return 0;
+    }
+    
+    if (option_ == ZMQ_UNSUBSCRIBE) {
+        std::string subscription ((const char*) optval_, optvallen_);
+        subscriptions_t::iterator it = subscriptions.find (subscription);
+        if (it == subscriptions.end ()) {
+            errno = EINVAL;
+            return -1;
+        }
+        subscriptions.erase (it);
+        return 0;
+    }
+
+    return socket_base_t::setsockopt (option_, optval_, optvallen_);
+}
+
+int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
+{
+    while (true) {
+
+        //  Get a message.
+        int rc = socket_base_t::recv (msg_, flags_);
+
+        //  If there's no message available, return immediately.
+        if (rc != 0 && errno == EAGAIN)
+            return -1;
+
+        //  Check the message format.
+        //  TODO: We should either ignore the message or drop the connection
+        //  if the message doesn't conform with the expected format.
+        unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
+        zmq_assert (*data <= zmq_msg_size (msg_) - 1);
+
+        //  Check whether the message matches at least one subscription.
+        std::string topic ((const char*) (data + 1), *data);
+        subscriptions_t::iterator it = subscriptions.find (topic);
+        if (it != subscriptions.end ())
+            break;
+    }
+
+    return 0;
+}
diff --git a/src/sub.hpp b/src/sub.hpp
new file mode 100644
index 0000000..1d4fdf9
--- /dev/null
+++ b/src/sub.hpp
@@ -0,0 +1,51 @@
+/*
+    Copyright (c) 2007-2009 FastMQ Inc.
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_SUB_INCLUDED__
+#define __ZMQ_SUB_INCLUDED__
+
+#include <set>
+#include <string>
+
+#include "socket_base.hpp"
+
+namespace zmq
+{
+
+    class sub_t : public socket_base_t
+    {
+    public:
+
+        sub_t (class app_thread_t *parent_);
+        ~sub_t ();
+
+        //  Overloads of API functions from socket_base_t.
+        int setsockopt (int option_, const void *optval_, size_t optvallen_);
+        int recv (struct zmq_msg_t *msg_, int flags_);
+
+    private:
+
+        //  List of all the active subscriptions.
+        typedef std::multiset <std::string> subscriptions_t;
+        subscriptions_t subscriptions;
+    };
+
+}
+
+#endif
-- 
cgit v1.2.3