summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-12-04 23:14:38 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-12-04 23:14:38 +0100
commitc80e7b80cc726ca7c29493c2553c8d19792bb6e5 (patch)
tree9c740ee660e900766a6f7d7e8a48bb09a6b72c4b
parentabc8b5e40c55deb96e7674b15629f2affa4eb92a (diff)
XPUB and XSUB socket types added.
These are just placeholders. At the moment XPUB behaves th same as PUB and XSUB as SUB. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--include/zmq.h4
-rw-r--r--src/Makefile.am4
-rw-r--r--src/pub.cpp144
-rw-r--r--src/pub.hpp34
-rw-r--r--src/socket_base.cpp11
-rw-r--r--src/sub.cpp134
-rw-r--r--src/sub.hpp38
-rw-r--r--src/xpub.cpp171
-rw-r--r--src/xpub.hpp73
-rw-r--r--src/xsub.cpp161
-rw-r--r--src/xsub.hpp78
11 files changed, 506 insertions, 346 deletions
diff --git a/include/zmq.h b/include/zmq.h
index 692f1c1..997595b 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -68,7 +68,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
/* 0MQ errors. */
/******************************************************************************/
-/* A number random anough not to collide with different errno ranges on */
+/* A number random enough not to collide with different errno ranges on */
/* different OSes. The assumption is that error_t is at least 32-bit type. */
#define ZMQ_HAUSNUMERO 156384712
@@ -178,6 +178,8 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_XREP 6
#define ZMQ_PULL 7
#define ZMQ_PUSH 8
+#define ZMQ_XPUB 9
+#define ZMQ_XSUB 10
#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */
#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */
diff --git a/src/Makefile.am b/src/Makefile.am
index a857ee7..eea7d35 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -126,8 +126,10 @@ libzmq_la_SOURCES = \
uuid.hpp \
windows.hpp \
wire.hpp \
+ xpub.hpp \
xrep.hpp \
xreq.hpp \
+ xsub.hpp \
ypipe.hpp \
yqueue.hpp \
zmq_connecter.hpp \
@@ -181,8 +183,10 @@ libzmq_la_SOURCES = \
transient_session.cpp \
trie.cpp \
uuid.cpp \
+ xpub.cpp \
xrep.cpp \
xreq.cpp \
+ xsub.cpp \
zmq.cpp \
zmq_connecter.cpp \
zmq_engine.cpp \
diff --git a/src/pub.cpp b/src/pub.cpp
index 84bd9e1..25d397c 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -17,155 +17,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "pub.hpp"
-#include "err.hpp"
-#include "msg_content.hpp"
-#include "pipe.hpp"
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_),
- active (0),
- terminating (false)
+ xpub_t (parent_, tid_)
{
- options.type = ZMQ_PUB;
- options.requires_in = false;
- options.requires_out = true;
}
zmq::pub_t::~pub_t ()
{
- zmq_assert (pipes.empty ());
-}
-
-void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_, const blob_t &peer_identity_)
-{
- zmq_assert (!inpipe_);
-
- outpipe_->set_event_sink (this);
-
- pipes.push_back (outpipe_);
- pipes.swap (active, pipes.size () - 1);
- active++;
-
- if (terminating) {
- register_term_acks (1);
- outpipe_->terminate ();
- }
}
-
-void zmq::pub_t::process_term (int linger_)
-{
- terminating = true;
-
- // Start shutdown process for all the pipes.
- for (pipes_t::size_type i = 0; i != pipes.size (); i++)
- pipes [i]->terminate ();
-
- // Wait for pipes to terminate before terminating yourself.
- register_term_acks (pipes.size ());
-
- // Continue with the termination immediately.
- socket_base_t::process_term (linger_);
-}
-
-void zmq::pub_t::activated (writer_t *pipe_)
-{
- // Move the pipe to the list of active pipes.
- pipes.swap (pipes.index (pipe_), active);
- active++;
-}
-
-void zmq::pub_t::terminated (writer_t *pipe_)
-{
- // Remove the pipe from the list; adjust number of active pipes
- // accordingly.
- if (pipes.index (pipe_) < active)
- active--;
- pipes.erase (pipe_);
-
- // If we are already terminating, wait for one term ack less.
- if (terminating)
- unregister_term_ack ();
-}
-
-int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
-{
- // If there are no active pipes available, simply drop the message.
- if (active == 0) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return 0;
- }
-
- msg_content_t *content = (msg_content_t*) msg_->content;
-
- // For VSMs the copying is straighforward.
- if (content == (msg_content_t*) ZMQ_VSM) {
- for (pipes_t::size_type i = 0; i < active;)
- if (write (pipes [i], msg_))
- i++;
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return 0;
- }
-
- // Optimisation for the case when there's only a single pipe
- // to send the message to - no refcount adjustment i.e. no atomic
- // operations are needed.
- if (active == 1) {
- if (!write (pipes [0], msg_)) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- }
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return 0;
- }
-
- // There are at least 2 destinations for the message. That means we have
- // to deal with reference counting. First add N-1 references to
- // the content (we are holding one reference anyway, that's why -1).
- if (msg_->flags & ZMQ_MSG_SHARED)
- content->refcnt.add (active - 1);
- else {
- content->refcnt.set (active);
- msg_->flags |= ZMQ_MSG_SHARED;
- }
-
- // Push the message to all destinations.
- for (pipes_t::size_type i = 0; i < active;) {
- if (!write (pipes [i], msg_))
- content->refcnt.sub (1);
- else
- i++;
- }
-
- // Detach the original message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
-
- return 0;
-}
-
-bool zmq::pub_t::xhas_out ()
-{
- return true;
-}
-
-bool zmq::pub_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
-{
- if (!pipe_->write (msg_)) {
- active--;
- pipes.swap (pipes.index (pipe_), active);
- return false;
- }
- if (!(msg_->flags & ZMQ_MSG_MORE))
- pipe_->flush ();
- return true;
-}
-
diff --git a/src/pub.hpp b/src/pub.hpp
index 270bda5..f398526 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -20,50 +20,20 @@
#ifndef __ZMQ_PUB_HPP_INCLUDED__
#define __ZMQ_PUB_HPP_INCLUDED__
-#include "socket_base.hpp"
-#include "array.hpp"
-#include "pipe.hpp"
+#include "xpub.hpp"
namespace zmq
{
- class pub_t : public socket_base_t, public i_writer_events
+ class pub_t : public xpub_t
{
public:
pub_t (class ctx_t *parent_, uint32_t tid_);
~pub_t ();
- // Implementations of virtual functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
- const blob_t &peer_identity_);
- int xsend (zmq_msg_t *msg_, int flags_);
- bool xhas_out ();
-
- // i_writer_events interface implementation.
- void activated (writer_t *pipe_);
- void terminated (writer_t *pipe_);
-
private:
- // Hook into the termination process.
- void process_term (int linger_);
-
- // Write the message to the pipe. Make the pipe inactive if writing
- // fails. In such a case false is returned.
- bool write (class writer_t *pipe_, zmq_msg_t *msg_);
-
- // Outbound pipes, i.e. those the socket is sending messages to.
- typedef array_t <class writer_t> pipes_t;
- pipes_t pipes;
-
- // Number of active pipes. All the active pipes are located at the
- // beginning of the pipes array.
- pipes_t::size_type active;
-
- // True if termination process is already underway.
- bool terminating;
-
pub_t (const pub_t&);
void operator = (const pub_t&);
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index bfaacb7..2fe7bfd 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -46,6 +46,8 @@
#include "ctx.hpp"
#include "platform.hpp"
#include "likely.hpp"
+#include "uuid.hpp"
+
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
@@ -55,7 +57,8 @@
#include "push.hpp"
#include "xreq.hpp"
#include "xrep.hpp"
-#include "uuid.hpp"
+#include "xpub.hpp"
+#include "xsub.hpp"
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
uint32_t tid_)
@@ -90,6 +93,12 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_PUSH:
s = new (std::nothrow) push_t (parent_, tid_);
break;
+ case ZMQ_XPUB:
+ s = new (std::nothrow) xpub_t (parent_, tid_);
+ break;
+ case ZMQ_XSUB:
+ s = new (std::nothrow) xsub_t (parent_, tid_);
+ break;
default:
errno = EINVAL;
return NULL;
diff --git a/src/sub.cpp b/src/sub.cpp
index 096fbc7..f763558 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -17,145 +17,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include <string.h>
-
-#include "../include/zmq.h"
-
#include "sub.hpp"
-#include "err.hpp"
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_),
- fq (this),
- has_message (false),
- more (false)
+ xsub_t (parent_, tid_)
{
- options.type = ZMQ_SUB;
- options.requires_in = true;
- options.requires_out = false;
- zmq_msg_init (&message);
}
zmq::sub_t::~sub_t ()
{
- zmq_msg_close (&message);
-}
-
-void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_, const blob_t &peer_identity_)
-{
- zmq_assert (inpipe_ && !outpipe_);
- fq.attach (inpipe_);
-}
-
-void zmq::sub_t::process_term (int linger_)
-{
- fq.terminate ();
- socket_base_t::process_term (linger_);
-}
-
-int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ == ZMQ_SUBSCRIBE) {
- subscriptions.add ((unsigned char*) optval_, optvallen_);
- return 0;
- }
-
- if (option_ == ZMQ_UNSUBSCRIBE) {
- if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) {
- errno = EINVAL;
- return -1;
- }
- return 0;
- }
-
- errno = EINVAL;
- return -1;
-}
-
-int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
-{
- // If there's already a message prepared by a previous call to zmq_poll,
- // return it straight ahead.
- if (has_message) {
- zmq_msg_move (msg_, &message);
- has_message = false;
- more = msg_->flags & ZMQ_MSG_MORE;
- return 0;
- }
-
- // TODO: This can result in infinite loop in the case of continuous
- // stream of non-matching messages which breaks the non-blocking recv
- // semantics.
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (msg_, flags_);
-
- // If there's no message available, return immediately.
- // The same when error occurs.
- if (rc != 0)
- return -1;
-
- // Check whether the message matches at least one subscription.
- // Non-initial parts of the message are passed
- if (more || match (msg_)) {
- more = msg_->flags & ZMQ_MSG_MORE;
- return 0;
- }
-
- // Message doesn't match. Pop any remaining parts of the message
- // from the pipe.
- while (msg_->flags & ZMQ_MSG_MORE) {
- rc = fq.recv (msg_, ZMQ_NOBLOCK);
- zmq_assert (rc == 0);
- }
- }
-}
-
-bool zmq::sub_t::xhas_in ()
-{
- // There are subsequent parts of the partly-read message available.
- if (more)
- return true;
-
- // If there's already a message prepared by a previous call to zmq_poll,
- // return straight ahead.
- if (has_message)
- return true;
-
- // TODO: This can result in infinite loop in the case of continuous
- // stream of non-matching messages.
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (&message, ZMQ_NOBLOCK);
-
- // If there's no message available, return immediately.
- // The same when error occurs.
- if (rc != 0) {
- zmq_assert (errno == EAGAIN);
- return false;
- }
-
- // Check whether the message matches at least one subscription.
- if (match (&message)) {
- has_message = true;
- return true;
- }
-
- // Message doesn't match. Pop any remaining parts of the message
- // from the pipe.
- while (message.flags & ZMQ_MSG_MORE) {
- rc = fq.recv (&message, ZMQ_NOBLOCK);
- zmq_assert (rc == 0);
- }
- }
-}
-
-bool zmq::sub_t::match (zmq_msg_t *msg_)
-{
- return subscriptions.check ((unsigned char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
}
diff --git a/src/sub.hpp b/src/sub.hpp
index 3f8ced0..0ea1fc4 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -20,54 +20,20 @@
#ifndef __ZMQ_SUB_HPP_INCLUDED__
#define __ZMQ_SUB_HPP_INCLUDED__
-#include "../include/zmq.h"
-
-#include "trie.hpp"
-#include "socket_base.hpp"
-#include "fq.hpp"
+#include "xsub.hpp"
namespace zmq
{
- class sub_t : public socket_base_t
+ class sub_t : public xsub_t
{
public:
sub_t (class ctx_t *parent_, uint32_t tid_);
~sub_t ();
- protected:
-
- // Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
- const blob_t &peer_identity_);
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
- int xrecv (zmq_msg_t *msg_, int flags_);
- bool xhas_in ();
-
private:
- // Hook into the termination process.
- void process_term (int linger_);
-
- // Check whether the message matches at least one subscription.
- bool match (zmq_msg_t *msg_);
-
- // Fair queueing object for inbound pipes.
- fq_t fq;
-
- // The repository of subscriptions.
- trie_t subscriptions;
-
- // If true, 'message' contains a matching message to return on the
- // next recv call.
- bool has_message;
- zmq_msg_t message;
-
- // If true, part of a multipart message was already received, but
- // there are following parts still waiting.
- bool more;
-
sub_t (const sub_t&);
void operator = (const sub_t&);
};
diff --git a/src/xpub.cpp b/src/xpub.cpp
new file mode 100644
index 0000000..1659d6f
--- /dev/null
+++ b/src/xpub.cpp
@@ -0,0 +1,171 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../include/zmq.h"
+
+#include "xpub.hpp"
+#include "err.hpp"
+#include "msg_content.hpp"
+#include "pipe.hpp"
+
+zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
+ socket_base_t (parent_, tid_),
+ active (0),
+ terminating (false)
+{
+ options.type = ZMQ_PUB;
+ options.requires_in = false;
+ options.requires_out = true;
+}
+
+zmq::xpub_t::~xpub_t ()
+{
+ zmq_assert (pipes.empty ());
+}
+
+void zmq::xpub_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_, const blob_t &peer_identity_)
+{
+ zmq_assert (!inpipe_);
+
+ outpipe_->set_event_sink (this);
+
+ pipes.push_back (outpipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+
+ if (terminating) {
+ register_term_acks (1);
+ outpipe_->terminate ();
+ }
+}
+
+void zmq::xpub_t::process_term (int linger_)
+{
+ terminating = true;
+
+ // Start shutdown process for all the pipes.
+ for (pipes_t::size_type i = 0; i != pipes.size (); i++)
+ pipes [i]->terminate ();
+
+ // Wait for pipes to terminate before terminating yourself.
+ register_term_acks (pipes.size ());
+
+ // Continue with the termination immediately.
+ socket_base_t::process_term (linger_);
+}
+
+void zmq::xpub_t::activated (writer_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ pipes.swap (pipes.index (pipe_), active);
+ active++;
+}
+
+void zmq::xpub_t::terminated (writer_t *pipe_)
+{
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ if (pipes.index (pipe_) < active)
+ active--;
+ pipes.erase (pipe_);
+
+ // If we are already terminating, wait for one term ack less.
+ if (terminating)
+ unregister_term_ack ();
+}
+
+int zmq::xpub_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ // If there are no active pipes available, simply drop the message.
+ if (active == 0) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ msg_content_t *content = (msg_content_t*) msg_->content;
+
+ // For VSMs the copying is straighforward.
+ if (content == (msg_content_t*) ZMQ_VSM) {
+ for (pipes_t::size_type i = 0; i < active;)
+ if (write (pipes [i], msg_))
+ i++;
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // Optimisation for the case when there's only a single pipe
+ // to send the message to - no refcount adjustment i.e. no atomic
+ // operations are needed.
+ if (active == 1) {
+ if (!write (pipes [0], msg_)) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ }
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // There are at least 2 destinations for the message. That means we have
+ // to deal with reference counting. First add N-1 references to
+ // the content (we are holding one reference anyway, that's why -1).
+ if (msg_->flags & ZMQ_MSG_SHARED)
+ content->refcnt.add (active - 1);
+ else {
+ content->refcnt.set (active);
+ msg_->flags |= ZMQ_MSG_SHARED;
+ }
+
+ // Push the message to all destinations.
+ for (pipes_t::size_type i = 0; i < active;) {
+ if (!write (pipes [i], msg_))
+ content->refcnt.sub (1);
+ else
+ i++;
+ }
+
+ // Detach the original message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+bool zmq::xpub_t::xhas_out ()
+{
+ return true;
+}
+
+bool zmq::xpub_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
+{
+ if (!pipe_->write (msg_)) {
+ active--;
+ pipes.swap (pipes.index (pipe_), active);
+ return false;
+ }
+ if (!(msg_->flags & ZMQ_MSG_MORE))
+ pipe_->flush ();
+ return true;
+}
+
diff --git a/src/xpub.hpp b/src/xpub.hpp
new file mode 100644
index 0000000..13dd405
--- /dev/null
+++ b/src/xpub.hpp
@@ -0,0 +1,73 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_XPUB_HPP_INCLUDED__
+#define __ZMQ_XPUB_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "array.hpp"
+#include "pipe.hpp"
+
+namespace zmq
+{
+
+ class xpub_t : public socket_base_t, public i_writer_events
+ {
+ public:
+
+ xpub_t (class ctx_t *parent_, uint32_t tid_);
+ ~xpub_t ();
+
+ // Implementations of virtual functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ bool xhas_out ();
+
+ // i_writer_events interface implementation.
+ void activated (writer_t *pipe_);
+ void terminated (writer_t *pipe_);
+
+ private:
+
+ // Hook into the termination process.
+ void process_term (int linger_);
+
+ // Write the message to the pipe. Make the pipe inactive if writing
+ // fails. In such a case false is returned.
+ bool write (class writer_t *pipe_, zmq_msg_t *msg_);
+
+ // Outbound pipes, i.e. those the socket is sending messages to.
+ typedef array_t <class writer_t> pipes_t;
+ pipes_t pipes;
+
+ // Number of active pipes. All the active pipes are located at the
+ // beginning of the pipes array.
+ pipes_t::size_type active;
+
+ // True if termination process is already underway.
+ bool terminating;
+
+ xpub_t (const xpub_t&);
+ void operator = (const xpub_t&);
+ };
+
+}
+
+#endif
diff --git a/src/xsub.cpp b/src/xsub.cpp
new file mode 100644
index 0000000..16e1042
--- /dev/null
+++ b/src/xsub.cpp
@@ -0,0 +1,161 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <string.h>
+
+#include "../include/zmq.h"
+
+#include "xsub.hpp"
+#include "err.hpp"
+
+zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
+ socket_base_t (parent_, tid_),
+ fq (this),
+ has_message (false),
+ more (false)
+{
+ options.type = ZMQ_SUB;
+ options.requires_in = true;
+ options.requires_out = false;
+ zmq_msg_init (&message);
+}
+
+zmq::xsub_t::~xsub_t ()
+{
+ zmq_msg_close (&message);
+}
+
+void zmq::xsub_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_, const blob_t &peer_identity_)
+{
+ zmq_assert (inpipe_ && !outpipe_);
+ fq.attach (inpipe_);
+}
+
+void zmq::xsub_t::process_term (int linger_)
+{
+ fq.terminate ();
+ socket_base_t::process_term (linger_);
+}
+
+int zmq::xsub_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ == ZMQ_SUBSCRIBE) {
+ subscriptions.add ((unsigned char*) optval_, optvallen_);
+ return 0;
+ }
+
+ if (option_ == ZMQ_UNSUBSCRIBE) {
+ if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) {
+ errno = EINVAL;
+ return -1;
+ }
+ return 0;
+ }
+
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ // If there's already a message prepared by a previous call to zmq_poll,
+ // return it straight ahead.
+ if (has_message) {
+ zmq_msg_move (msg_, &message);
+ has_message = false;
+ more = msg_->flags & ZMQ_MSG_MORE;
+ return 0;
+ }
+
+ // TODO: This can result in infinite loop in the case of continuous
+ // stream of non-matching messages which breaks the non-blocking recv
+ // semantics.
+ while (true) {
+
+ // Get a message using fair queueing algorithm.
+ int rc = fq.recv (msg_, flags_);
+
+ // If there's no message available, return immediately.
+ // The same when error occurs.
+ if (rc != 0)
+ return -1;
+
+ // Check whether the message matches at least one subscription.
+ // Non-initial parts of the message are passed
+ if (more || match (msg_)) {
+ more = msg_->flags & ZMQ_MSG_MORE;
+ return 0;
+ }
+
+ // Message doesn't match. Pop any remaining parts of the message
+ // from the pipe.
+ while (msg_->flags & ZMQ_MSG_MORE) {
+ rc = fq.recv (msg_, ZMQ_NOBLOCK);
+ zmq_assert (rc == 0);
+ }
+ }
+}
+
+bool zmq::xsub_t::xhas_in ()
+{
+ // There are subsequent parts of the partly-read message available.
+ if (more)
+ return true;
+
+ // If there's already a message prepared by a previous call to zmq_poll,
+ // return straight ahead.
+ if (has_message)
+ return true;
+
+ // TODO: This can result in infinite loop in the case of continuous
+ // stream of non-matching messages.
+ while (true) {
+
+ // Get a message using fair queueing algorithm.
+ int rc = fq.recv (&message, ZMQ_NOBLOCK);
+
+ // If there's no message available, return immediately.
+ // The same when error occurs.
+ if (rc != 0) {
+ zmq_assert (errno == EAGAIN);
+ return false;
+ }
+
+ // Check whether the message matches at least one subscription.
+ if (match (&message)) {
+ has_message = true;
+ return true;
+ }
+
+ // Message doesn't match. Pop any remaining parts of the message
+ // from the pipe.
+ while (message.flags & ZMQ_MSG_MORE) {
+ rc = fq.recv (&message, ZMQ_NOBLOCK);
+ zmq_assert (rc == 0);
+ }
+ }
+}
+
+bool zmq::xsub_t::match (zmq_msg_t *msg_)
+{
+ return subscriptions.check ((unsigned char*) zmq_msg_data (msg_),
+ zmq_msg_size (msg_));
+}
diff --git a/src/xsub.hpp b/src/xsub.hpp
new file mode 100644
index 0000000..3d7b08f
--- /dev/null
+++ b/src/xsub.hpp
@@ -0,0 +1,78 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_XSUB_HPP_INCLUDED__
+#define __ZMQ_XSUB_HPP_INCLUDED__
+
+#include "../include/zmq.h"
+
+#include "trie.hpp"
+#include "socket_base.hpp"
+#include "fq.hpp"
+
+namespace zmq
+{
+
+ class xsub_t : public socket_base_t
+ {
+ public:
+
+ xsub_t (class ctx_t *parent_, uint32_t tid_);
+ ~xsub_t ();
+
+ protected:
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+
+ private:
+
+ // Hook into the termination process.
+ void process_term (int linger_);
+
+ // Check whether the message matches at least one subscription.
+ bool match (zmq_msg_t *msg_);
+
+ // Fair queueing object for inbound pipes.
+ fq_t fq;
+
+ // The repository of subscriptions.
+ trie_t subscriptions;
+
+ // If true, 'message' contains a matching message to return on the
+ // next recv call.
+ bool has_message;
+ zmq_msg_t message;
+
+ // If true, part of a multipart message was already received, but
+ // there are following parts still waiting.
+ bool more;
+
+ xsub_t (const xsub_t&);
+ void operator = (const xsub_t&);
+ };
+
+}
+
+#endif
+