diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-05-31 16:21:17 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-05-31 16:21:17 +0200 |
commit | ee7313b4d896e9f7ff6a035395b20f617e4ff796 (patch) | |
tree | 3406da465b5a51e379fc5295a949c3aaac69d820 /src | |
parent | a24a7c15a824bb48da38809bff9416673dc5a176 (diff) |
Subscriptions are processed immediately in XPUB socket
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/xpub.cpp | 127 | ||||
-rw-r--r-- | src/xpub.hpp | 8 |
2 files changed, 42 insertions, 93 deletions
diff --git a/src/xpub.cpp b/src/xpub.cpp index 9158cf4..4b41696 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -39,12 +39,41 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); dist.attach (pipe_); - fq.attach (pipe_); + + // The pipe is active when attached. Let's read the subscriptions from + // it, if any. + xread_activated (pipe_); } void zmq::xpub_t::xread_activated (pipe_t *pipe_) { - fq.activated (pipe_); + // There are some subscriptions waiting. Let's process them. + msg_t sub; + sub.init (); + while (true) { + + // Grab next subscription. + if (!pipe_->read (&sub)) { + sub.close (); + return; + } + + // Apply the subscription to the trie. + unsigned char *data = (unsigned char*) sub.data (); + size_t size = sub.size (); + zmq_assert (size > 0 && (*data == 0 || *data == 1)); + bool unique; + if (*data == 0) + unique = subscriptions.rm (data + 1, size - 1, pipe_); + else + unique = subscriptions.add (data + 1, size - 1, pipe_); + + // If the subscription is not a duplicate store it so that it can be + // passed to used on next recv call. + if (unique && options.type != ZMQ_PUB) + pending.push_back (blob_t ((unsigned char*) sub.data (), + sub.size ())); + } } void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) @@ -60,31 +89,10 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_) subscriptions.rm (pipe_, send_unsubscription, this); dist.terminated (pipe_); - fq.terminated (pipe_); } int zmq::xpub_t::xsend (msg_t *msg_, int flags_) -{ - // First, process any (un)subscriptions from downstream. - msg_t sub; - sub.init (); - while (true) { - - // Grab next subscription. - pipe_t *pipe; - int rc = fq.recvpipe (&sub, 0, &pipe); - if (rc != 0 && errno == EAGAIN) - break; - errno_assert (rc == 0); - - // Apply the subscription to the trie. If it's not a duplicate, - // store it so that it can be passed to used on next recv call. - if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) - pending.push_back (blob_t ((unsigned char*) sub.data (), - sub.size ())); - } - sub.close (); - +{ return dist.send (msg_, flags_); } @@ -96,75 +104,24 @@ bool zmq::xpub_t::xhas_out () int zmq::xpub_t::xrecv (msg_t *msg_, int flags_) { // If there is at least one - if (!pending.empty ()) { - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init_size (pending.front ().size ()); - errno_assert (rc == 0); - memcpy (msg_->data (), pending.front ().data (), - pending.front ().size ()); - pending.pop_front (); - return 0; - } - - // Grab and apply next subscription. - pipe_t *pipe; - int rc = fq.recvpipe (msg_, 0, &pipe); - if (rc != 0) - return -1; - if (!apply_subscription (msg_, pipe)) { -// TODO: This should be a loop rather! - msg_->close (); - msg_->init (); + if (pending.empty ()) { errno = EAGAIN; return -1; } + + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init_size (pending.front ().size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), pending.front ().data (), + pending.front ().size ()); + pending.pop_front (); return 0; } bool zmq::xpub_t::xhas_in () { - if (!pending.empty ()) - return true; - - // Even if there are subscriptions in the fair-queuer they may be - // duplicates. Thus, we have to check by hand wheter there is any - // subscription available to pass upstream. - // First, process any (un)subscriptions from downstream. - msg_t sub; - sub.init (); - while (true) { - - // Grab next subscription. - pipe_t *pipe; - int rc = fq.recvpipe (&sub, 0, &pipe); - if (rc != 0 && errno == EAGAIN) { - sub.close (); - return false; - } - errno_assert (rc == 0); - - // Apply the subscription to the trie. If it's not a duplicate store - // it so that it can be passed to used on next recv call. - if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) { - pending.push_back (blob_t ((unsigned char*) sub.data (), - sub.size ())); - sub.close (); - return true; - } - } -} - -bool zmq::xpub_t::apply_subscription (msg_t *sub_, pipe_t *pipe_) -{ - unsigned char *data = (unsigned char*) sub_->data (); - size_t size = sub_->size (); - zmq_assert (size > 0 && (*data == 0 || *data == 1)); - - if (*data == 0) - return subscriptions.rm (data + 1, size - 1, pipe_); - else - return subscriptions.add (data + 1, size - 1, pipe_); + return !pending.empty (); } void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, diff --git a/src/xpub.hpp b/src/xpub.hpp index b824548..c5d64b5 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -28,7 +28,6 @@ #include "array.hpp" #include "blob.hpp" #include "dist.hpp" -#include "fq.hpp" namespace zmq { @@ -53,10 +52,6 @@ namespace zmq private: - // Applies the subscription to the trie. Return false if it is a - // duplicate. - bool apply_subscription (class msg_t *sub_, class pipe_t *pipe_); - // Function to be applied to the trie to send all the subsciptions // upstream. static void send_unsubscription (unsigned char *data_, size_t size_, @@ -68,9 +63,6 @@ namespace zmq // Distributor of messages holding the list of outbound pipes. dist_t dist; - // Object to fair-queue the subscription requests. - fq_t fq; - // List of pending (un)subscriptions, ie. those that were already // applied to the trie, but not yet received by the user. typedef std::deque <blob_t> pending_t; |