From 0b59866a84f733e5a53b0d2f32570581691747ef Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 30 May 2011 10:07:34 +0200 Subject: Patches from sub-forward branch incorporated Signed-off-by: Martin Sustrik --- src/xpub.cpp | 120 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 117 insertions(+), 3 deletions(-) (limited to 'src/xpub.cpp') diff --git a/src/xpub.cpp b/src/xpub.cpp index d5dba9f..9158cf4 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -18,6 +18,8 @@ along with this program. If not, see . */ +#include + #include "xpub.hpp" #include "pipe.hpp" #include "err.hpp" @@ -37,6 +39,12 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); dist.attach (pipe_); + fq.attach (pipe_); +} + +void zmq::xpub_t::xread_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); } void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) @@ -46,11 +54,37 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) void zmq::xpub_t::xterminated (pipe_t *pipe_) { + // Remove the pipe from the trie. If there are topics that nobody + // is interested in anymore, send corresponding unsubscriptions + // upstream. + subscriptions.rm (pipe_, send_unsubscription, this); + dist.terminated (pipe_); + fq.terminated (pipe_); } int zmq::xpub_t::xsend (msg_t *msg_, int flags_) { + // First, process any (un)subscriptions from downstream. + msg_t sub; + sub.init (); + while (true) { + + // Grab next subscription. + pipe_t *pipe; + int rc = fq.recvpipe (&sub, 0, &pipe); + if (rc != 0 && errno == EAGAIN) + break; + errno_assert (rc == 0); + + // Apply the subscription to the trie. If it's not a duplicate, + // store it so that it can be passed to used on next recv call. + if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) + pending.push_back (blob_t ((unsigned char*) sub.data (), + sub.size ())); + } + sub.close (); + return dist.send (msg_, flags_); } @@ -61,12 +95,92 @@ bool zmq::xpub_t::xhas_out () int zmq::xpub_t::xrecv (msg_t *msg_, int flags_) { - errno = EAGAIN; - return -1; + // If there is at least one + if (!pending.empty ()) { + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init_size (pending.front ().size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), pending.front ().data (), + pending.front ().size ()); + pending.pop_front (); + return 0; + } + + // Grab and apply next subscription. + pipe_t *pipe; + int rc = fq.recvpipe (msg_, 0, &pipe); + if (rc != 0) + return -1; + if (!apply_subscription (msg_, pipe)) { +// TODO: This should be a loop rather! + msg_->close (); + msg_->init (); + errno = EAGAIN; + return -1; + } + return 0; } bool zmq::xpub_t::xhas_in () { - return false; + if (!pending.empty ()) + return true; + + // Even if there are subscriptions in the fair-queuer they may be + // duplicates. Thus, we have to check by hand wheter there is any + // subscription available to pass upstream. + // First, process any (un)subscriptions from downstream. + msg_t sub; + sub.init (); + while (true) { + + // Grab next subscription. + pipe_t *pipe; + int rc = fq.recvpipe (&sub, 0, &pipe); + if (rc != 0 && errno == EAGAIN) { + sub.close (); + return false; + } + errno_assert (rc == 0); + + // Apply the subscription to the trie. If it's not a duplicate store + // it so that it can be passed to used on next recv call. + if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) { + pending.push_back (blob_t ((unsigned char*) sub.data (), + sub.size ())); + sub.close (); + return true; + } + } +} + +bool zmq::xpub_t::apply_subscription (msg_t *sub_, pipe_t *pipe_) +{ + unsigned char *data = (unsigned char*) sub_->data (); + size_t size = sub_->size (); + zmq_assert (size > 0 && (*data == 0 || *data == 1)); + + if (*data == 0) + return subscriptions.rm (data + 1, size - 1, pipe_); + else + return subscriptions.add (data + 1, size - 1, pipe_); +} + +void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, + void *arg_) +{ + xpub_t *self = (xpub_t*) arg_; + + if (self->options.type != ZMQ_PUB) { + + // Place the unsubscription to the queue of pending (un)sunscriptions + // to be retrived by the user later on. + xpub_t *self = (xpub_t*) arg_; + blob_t unsub (size_ + 1, 0); + unsub [0] = 0; + memcpy (&unsub [1], data_, size_); + self->pending.push_back (unsub); + } } -- cgit v1.2.3