From 311fb0d852374e769d8ff791c9df38f0464960c6 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 28 May 2011 09:02:21 +0200 Subject: Subscription matching moved from XSUB to SUB socket This patch will prevent duplicate matching in devices in the future. Instead of matching in both XPUB and XSUB, it'll happen only in XPUB. Receiver endpoint will still filter messages via SUB socket. Signed-off-by: Martin Sustrik --- src/sub.cpp | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 2 deletions(-) (limited to 'src/sub.cpp') diff --git a/src/sub.cpp b/src/sub.cpp index 2d6ade6..11c4532 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -22,19 +22,39 @@ #include "msg.hpp" zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : - xsub_t (parent_, tid_) + xsub_t (parent_, tid_), + has_message (false), + more (false) { options.type = ZMQ_SUB; + int rc = message.init (); + errno_assert (rc == 0); } zmq::sub_t::~sub_t () { + int rc = message.close (); + errno_assert (rc == 0); } int zmq::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { - if (option_ != ZMQ_SUBSCRIBE && option_ != ZMQ_UNSUBSCRIBE) { + // Process a subscription. + if (option_ == ZMQ_SUBSCRIBE) + subscriptions.add ((unsigned char*) optval_, optvallen_); + + // Process an unsubscription. Return error if there is no corresponding + // subscription. + else if (option_ == ZMQ_UNSUBSCRIBE) { + if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) { + errno = EINVAL; + return -1; + } + } + + // Unknow option. + else { errno = EINVAL; return -1; } @@ -74,3 +94,90 @@ bool zmq::sub_t::xhas_out () // Overload the XSUB's send. return false; } + +int zmq::sub_t::xrecv (msg_t *msg_, int flags_) +{ + // If there's already a message prepared by a previous call to zmq_poll, + // return it straight ahead. + if (has_message) { + int rc = msg_->move (message); + errno_assert (rc == 0); + has_message = false; + more = msg_->flags () & msg_t::more; + return 0; + } + + // TODO: This can result in infinite loop in the case of continuous + // stream of non-matching messages which breaks the non-blocking recv + // semantics. + while (true) { + + // Get a message using fair queueing algorithm. + int rc = xsub_t::xrecv (msg_, flags_); + + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) + return -1; + + // Check whether the message matches at least one subscription. + // Non-initial parts of the message are passed + if (more || match (msg_)) { + more = msg_->flags () & msg_t::more; + return 0; + } + + // Message doesn't match. Pop any remaining parts of the message + // from the pipe. + while (msg_->flags () & msg_t::more) { + rc = xsub_t::xrecv (msg_, ZMQ_DONTWAIT); + zmq_assert (rc == 0); + } + } +} + +bool zmq::sub_t::xhas_in () +{ + // There are subsequent parts of the partly-read message available. + if (more) + return true; + + // If there's already a message prepared by a previous call to zmq_poll, + // return straight ahead. + if (has_message) + return true; + + // TODO: This can result in infinite loop in the case of continuous + // stream of non-matching messages. + while (true) { + + // Get a message using fair queueing algorithm. + int rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT); + + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) { + zmq_assert (errno == EAGAIN); + return false; + } + + // Check whether the message matches at least one subscription. + if (match (&message)) { + has_message = true; + return true; + } + + // Message doesn't match. Pop any remaining parts of the message + // from the pipe. + while (message.flags () & msg_t::more) { + rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT); + zmq_assert (rc == 0); + } + } +} + +bool zmq::sub_t::match (msg_t *msg_) +{ + return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); +} + -- cgit v1.2.3