summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-31 16:21:17 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-31 16:21:17 +0200
commitee7313b4d896e9f7ff6a035395b20f617e4ff796 (patch)
tree3406da465b5a51e379fc5295a949c3aaac69d820 /src
parenta24a7c15a824bb48da38809bff9416673dc5a176 (diff)
Subscriptions are processed immediately in XPUB socket
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/xpub.cpp127
-rw-r--r--src/xpub.hpp8
2 files changed, 42 insertions, 93 deletions
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 9158cf4..4b41696 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -39,12 +39,41 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
dist.attach (pipe_);
- fq.attach (pipe_);
+
+ // The pipe is active when attached. Let's read the subscriptions from
+ // it, if any.
+ xread_activated (pipe_);
}
void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
- fq.activated (pipe_);
+ // There are some subscriptions waiting. Let's process them.
+ msg_t sub;
+ sub.init ();
+ while (true) {
+
+ // Grab next subscription.
+ if (!pipe_->read (&sub)) {
+ sub.close ();
+ return;
+ }
+
+ // Apply the subscription to the trie.
+ unsigned char *data = (unsigned char*) sub.data ();
+ size_t size = sub.size ();
+ zmq_assert (size > 0 && (*data == 0 || *data == 1));
+ bool unique;
+ if (*data == 0)
+ unique = subscriptions.rm (data + 1, size - 1, pipe_);
+ else
+ unique = subscriptions.add (data + 1, size - 1, pipe_);
+
+ // If the subscription is not a duplicate store it so that it can be
+ // passed to used on next recv call.
+ if (unique && options.type != ZMQ_PUB)
+ pending.push_back (blob_t ((unsigned char*) sub.data (),
+ sub.size ()));
+ }
}
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
@@ -60,31 +89,10 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_)
subscriptions.rm (pipe_, send_unsubscription, this);
dist.terminated (pipe_);
- fq.terminated (pipe_);
}
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
-{
- // First, process any (un)subscriptions from downstream.
- msg_t sub;
- sub.init ();
- while (true) {
-
- // Grab next subscription.
- pipe_t *pipe;
- int rc = fq.recvpipe (&sub, 0, &pipe);
- if (rc != 0 && errno == EAGAIN)
- break;
- errno_assert (rc == 0);
-
- // Apply the subscription to the trie. If it's not a duplicate,
- // store it so that it can be passed to used on next recv call.
- if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB)
- pending.push_back (blob_t ((unsigned char*) sub.data (),
- sub.size ()));
- }
- sub.close ();
-
+{
return dist.send (msg_, flags_);
}
@@ -96,75 +104,24 @@ bool zmq::xpub_t::xhas_out ()
int zmq::xpub_t::xrecv (msg_t *msg_, int flags_)
{
// If there is at least one
- if (!pending.empty ()) {
- int rc = msg_->close ();
- errno_assert (rc == 0);
- rc = msg_->init_size (pending.front ().size ());
- errno_assert (rc == 0);
- memcpy (msg_->data (), pending.front ().data (),
- pending.front ().size ());
- pending.pop_front ();
- return 0;
- }
-
- // Grab and apply next subscription.
- pipe_t *pipe;
- int rc = fq.recvpipe (msg_, 0, &pipe);
- if (rc != 0)
- return -1;
- if (!apply_subscription (msg_, pipe)) {
-// TODO: This should be a loop rather!
- msg_->close ();
- msg_->init ();
+ if (pending.empty ()) {
errno = EAGAIN;
return -1;
}
+
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init_size (pending.front ().size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), pending.front ().data (),
+ pending.front ().size ());
+ pending.pop_front ();
return 0;
}
bool zmq::xpub_t::xhas_in ()
{
- if (!pending.empty ())
- return true;
-
- // Even if there are subscriptions in the fair-queuer they may be
- // duplicates. Thus, we have to check by hand wheter there is any
- // subscription available to pass upstream.
- // First, process any (un)subscriptions from downstream.
- msg_t sub;
- sub.init ();
- while (true) {
-
- // Grab next subscription.
- pipe_t *pipe;
- int rc = fq.recvpipe (&sub, 0, &pipe);
- if (rc != 0 && errno == EAGAIN) {
- sub.close ();
- return false;
- }
- errno_assert (rc == 0);
-
- // Apply the subscription to the trie. If it's not a duplicate store
- // it so that it can be passed to used on next recv call.
- if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) {
- pending.push_back (blob_t ((unsigned char*) sub.data (),
- sub.size ()));
- sub.close ();
- return true;
- }
- }
-}
-
-bool zmq::xpub_t::apply_subscription (msg_t *sub_, pipe_t *pipe_)
-{
- unsigned char *data = (unsigned char*) sub_->data ();
- size_t size = sub_->size ();
- zmq_assert (size > 0 && (*data == 0 || *data == 1));
-
- if (*data == 0)
- return subscriptions.rm (data + 1, size - 1, pipe_);
- else
- return subscriptions.add (data + 1, size - 1, pipe_);
+ return !pending.empty ();
}
void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
diff --git a/src/xpub.hpp b/src/xpub.hpp
index b824548..c5d64b5 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -28,7 +28,6 @@
#include "array.hpp"
#include "blob.hpp"
#include "dist.hpp"
-#include "fq.hpp"
namespace zmq
{
@@ -53,10 +52,6 @@ namespace zmq
private:
- // Applies the subscription to the trie. Return false if it is a
- // duplicate.
- bool apply_subscription (class msg_t *sub_, class pipe_t *pipe_);
-
// Function to be applied to the trie to send all the subsciptions
// upstream.
static void send_unsubscription (unsigned char *data_, size_t size_,
@@ -68,9 +63,6 @@ namespace zmq
// Distributor of messages holding the list of outbound pipes.
dist_t dist;
- // Object to fair-queue the subscription requests.
- fq_t fq;
-
// List of pending (un)subscriptions, ie. those that were already
// applied to the trie, but not yet received by the user.
typedef std::deque <blob_t> pending_t;