diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-05-28 09:02:21 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-05-28 09:02:21 +0200 |
commit | 311fb0d852374e769d8ff791c9df38f0464960c6 (patch) | |
tree | a375bed4c85bbbdb57e7c749997b95e6062ef18c /src/xsub.cpp | |
parent | 718885fdcd7af797f940078ca8c22aebab93c8bb (diff) |
Subscription matching moved from XSUB to SUB socket
This patch will prevent duplicate matching in devices in the future.
Instead of matching in both XPUB and XSUB, it'll happen only
in XPUB. Receiver endpoint will still filter messages via SUB
socket.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xsub.cpp')
-rw-r--r-- | src/xsub.cpp | 107 |
1 files changed, 6 insertions, 101 deletions
diff --git a/src/xsub.cpp b/src/xsub.cpp index c5f610f..6928d82 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -24,19 +24,13 @@ #include "err.hpp" zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), - has_message (false), - more (false) + socket_base_t (parent_, tid_) { options.type = ZMQ_XSUB; - int rc = message.init (); - errno_assert (rc == 0); } zmq::xsub_t::~xsub_t () { - int rc = message.close (); - errno_assert (rc == 0); } void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) @@ -57,21 +51,8 @@ void zmq::xsub_t::xterminated (pipe_t *pipe_) int zmq::xsub_t::xsend (msg_t *msg_, int options_) { - size_t size = msg_->size (); - unsigned char *data = (unsigned char*) msg_->data (); - - // Malformed subscriptions are dropped silently. - if (size >= 1) { - - // Process a subscription. - if (*data == 1) - subscriptions.add (data + 1, size - 1); - - // Process an unsubscription. Invalid unsubscription is ignored. - if (*data == 0) - subscriptions.rm (data + 1, size - 1); - } - + // TODO: Once we'll send the subscription upstream here. For now + // just empty the message. int rc = msg_->close (); errno_assert (rc == 0); rc = msg_->init (); @@ -85,89 +66,13 @@ bool zmq::xsub_t::xhas_out () return true; } -int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) +int zmq::xsub_t::xrecv (class msg_t *msg_, int flags_) { - // If there's already a message prepared by a previous call to zmq_poll, - // return it straight ahead. - if (has_message) { - int rc = msg_->move (message); - errno_assert (rc == 0); - has_message = false; - more = msg_->flags () & msg_t::more; - return 0; - } - - // TODO: This can result in infinite loop in the case of continuous - // stream of non-matching messages which breaks the non-blocking recv - // semantics. - while (true) { - - // Get a message using fair queueing algorithm. - int rc = fq.recv (msg_, flags_); - - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) - return -1; - - // Check whether the message matches at least one subscription. - // Non-initial parts of the message are passed - if (more || match (msg_)) { - more = msg_->flags () & msg_t::more; - return 0; - } - - // Message doesn't match. Pop any remaining parts of the message - // from the pipe. - while (msg_->flags () & msg_t::more) { - rc = fq.recv (msg_, ZMQ_DONTWAIT); - zmq_assert (rc == 0); - } - } + return fq.recv (msg_, flags_); } bool zmq::xsub_t::xhas_in () { - // There are subsequent parts of the partly-read message available. - if (more) - return true; - - // If there's already a message prepared by a previous call to zmq_poll, - // return straight ahead. - if (has_message) - return true; - - // TODO: This can result in infinite loop in the case of continuous - // stream of non-matching messages. - while (true) { - - // Get a message using fair queueing algorithm. - int rc = fq.recv (&message, ZMQ_DONTWAIT); - - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) { - zmq_assert (errno == EAGAIN); - return false; - } - - // Check whether the message matches at least one subscription. - if (match (&message)) { - has_message = true; - return true; - } - - // Message doesn't match. Pop any remaining parts of the message - // from the pipe. - while (message.flags () & msg_t::more) { - rc = fq.recv (&message, ZMQ_DONTWAIT); - zmq_assert (rc == 0); - } - } -} - -bool zmq::xsub_t::match (msg_t *msg_) -{ - return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); + return fq.has_in (); } |