diff options
Diffstat (limited to 'src/xpub.cpp')
-rw-r--r-- | src/xpub.cpp | 127 |
1 files changed, 42 insertions, 85 deletions
diff --git a/src/xpub.cpp b/src/xpub.cpp index 9158cf4..4b41696 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -39,12 +39,41 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); dist.attach (pipe_); - fq.attach (pipe_); + + // The pipe is active when attached. Let's read the subscriptions from + // it, if any. + xread_activated (pipe_); } void zmq::xpub_t::xread_activated (pipe_t *pipe_) { - fq.activated (pipe_); + // There are some subscriptions waiting. Let's process them. + msg_t sub; + sub.init (); + while (true) { + + // Grab next subscription. + if (!pipe_->read (&sub)) { + sub.close (); + return; + } + + // Apply the subscription to the trie. + unsigned char *data = (unsigned char*) sub.data (); + size_t size = sub.size (); + zmq_assert (size > 0 && (*data == 0 || *data == 1)); + bool unique; + if (*data == 0) + unique = subscriptions.rm (data + 1, size - 1, pipe_); + else + unique = subscriptions.add (data + 1, size - 1, pipe_); + + // If the subscription is not a duplicate store it so that it can be + // passed to used on next recv call. + if (unique && options.type != ZMQ_PUB) + pending.push_back (blob_t ((unsigned char*) sub.data (), + sub.size ())); + } } void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) @@ -60,31 +89,10 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_) subscriptions.rm (pipe_, send_unsubscription, this); dist.terminated (pipe_); - fq.terminated (pipe_); } int zmq::xpub_t::xsend (msg_t *msg_, int flags_) -{ - // First, process any (un)subscriptions from downstream. - msg_t sub; - sub.init (); - while (true) { - - // Grab next subscription. - pipe_t *pipe; - int rc = fq.recvpipe (&sub, 0, &pipe); - if (rc != 0 && errno == EAGAIN) - break; - errno_assert (rc == 0); - - // Apply the subscription to the trie. If it's not a duplicate, - // store it so that it can be passed to used on next recv call. - if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) - pending.push_back (blob_t ((unsigned char*) sub.data (), - sub.size ())); - } - sub.close (); - +{ return dist.send (msg_, flags_); } @@ -96,75 +104,24 @@ bool zmq::xpub_t::xhas_out () int zmq::xpub_t::xrecv (msg_t *msg_, int flags_) { // If there is at least one - if (!pending.empty ()) { - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init_size (pending.front ().size ()); - errno_assert (rc == 0); - memcpy (msg_->data (), pending.front ().data (), - pending.front ().size ()); - pending.pop_front (); - return 0; - } - - // Grab and apply next subscription. - pipe_t *pipe; - int rc = fq.recvpipe (msg_, 0, &pipe); - if (rc != 0) - return -1; - if (!apply_subscription (msg_, pipe)) { -// TODO: This should be a loop rather! - msg_->close (); - msg_->init (); + if (pending.empty ()) { errno = EAGAIN; return -1; } + + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init_size (pending.front ().size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), pending.front ().data (), + pending.front ().size ()); + pending.pop_front (); return 0; } bool zmq::xpub_t::xhas_in () { - if (!pending.empty ()) - return true; - - // Even if there are subscriptions in the fair-queuer they may be - // duplicates. Thus, we have to check by hand wheter there is any - // subscription available to pass upstream. - // First, process any (un)subscriptions from downstream. - msg_t sub; - sub.init (); - while (true) { - - // Grab next subscription. - pipe_t *pipe; - int rc = fq.recvpipe (&sub, 0, &pipe); - if (rc != 0 && errno == EAGAIN) { - sub.close (); - return false; - } - errno_assert (rc == 0); - - // Apply the subscription to the trie. If it's not a duplicate store - // it so that it can be passed to used on next recv call. - if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) { - pending.push_back (blob_t ((unsigned char*) sub.data (), - sub.size ())); - sub.close (); - return true; - } - } -} - -bool zmq::xpub_t::apply_subscription (msg_t *sub_, pipe_t *pipe_) -{ - unsigned char *data = (unsigned char*) sub_->data (); - size_t size = sub_->size (); - zmq_assert (size > 0 && (*data == 0 || *data == 1)); - - if (*data == 0) - return subscriptions.rm (data + 1, size - 1, pipe_); - else - return subscriptions.add (data + 1, size - 1, pipe_); + return !pending.empty (); } void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, |