diff options
| -rw-r--r-- | src/xpub.cpp | 127 | ||||
| -rw-r--r-- | src/xpub.hpp | 8 | 
2 files changed, 42 insertions, 93 deletions
| diff --git a/src/xpub.cpp b/src/xpub.cpp index 9158cf4..4b41696 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -39,12 +39,41 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)  {      zmq_assert (pipe_);      dist.attach (pipe_); -    fq.attach (pipe_); + +    //  The pipe is active when attached. Let's read the subscriptions from +    //  it, if any. +    xread_activated (pipe_);  }  void zmq::xpub_t::xread_activated (pipe_t *pipe_)  { -    fq.activated (pipe_); +    //  There are some subscriptions waiting. Let's process them. +    msg_t sub; +    sub.init (); +    while (true) { + +        //  Grab next subscription. +        if (!pipe_->read (&sub)) { +            sub.close (); +            return; +        } + +        //  Apply the subscription to the trie. +        unsigned char *data = (unsigned char*) sub.data (); +        size_t size = sub.size (); +        zmq_assert (size > 0 && (*data == 0 || *data == 1)); +        bool unique; +		if (*data == 0) +		    unique = subscriptions.rm (data + 1, size - 1, pipe_); +		else +		    unique = subscriptions.add (data + 1, size - 1, pipe_); + +        //  If the subscription is not a duplicate store it so that it can be +        //  passed to used on next recv call. +        if (unique && options.type != ZMQ_PUB) +            pending.push_back (blob_t ((unsigned char*) sub.data (), +                sub.size ())); +    }  }  void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) @@ -60,31 +89,10 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_)      subscriptions.rm (pipe_, send_unsubscription, this);      dist.terminated (pipe_); -    fq.terminated (pipe_);  }  int zmq::xpub_t::xsend (msg_t *msg_, int flags_) -{ -    //  First, process any (un)subscriptions from downstream. -    msg_t sub; -    sub.init (); -    while (true) { - -        //  Grab next subscription. -        pipe_t *pipe; -        int rc = fq.recvpipe (&sub, 0, &pipe); -        if (rc != 0 && errno == EAGAIN) -            break; -        errno_assert (rc == 0); - -        //  Apply the subscription to the trie. If it's not a duplicate, -        //  store it so that it can be passed to used on next recv call. -        if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) -            pending.push_back (blob_t ((unsigned char*) sub.data (), -                sub.size ())); -    } -    sub.close (); -     +{          return dist.send (msg_, flags_);  } @@ -96,75 +104,24 @@ bool zmq::xpub_t::xhas_out ()  int zmq::xpub_t::xrecv (msg_t *msg_, int flags_)  {      //  If there is at least one  -    if (!pending.empty ()) { -        int rc = msg_->close (); -        errno_assert (rc == 0); -        rc = msg_->init_size (pending.front ().size ()); -        errno_assert (rc == 0); -        memcpy (msg_->data (), pending.front ().data (), -            pending.front ().size ()); -        pending.pop_front (); -        return 0; -    } - -    //  Grab and apply next subscription. -    pipe_t *pipe; -    int rc = fq.recvpipe (msg_, 0, &pipe); -    if (rc != 0) -        return -1; -    if (!apply_subscription (msg_, pipe)) { -//  TODO: This should be a loop rather! -        msg_->close (); -        msg_->init (); +    if (pending.empty ()) {          errno = EAGAIN;          return -1;      } + +    int rc = msg_->close (); +    errno_assert (rc == 0); +    rc = msg_->init_size (pending.front ().size ()); +    errno_assert (rc == 0); +    memcpy (msg_->data (), pending.front ().data (), +        pending.front ().size ()); +    pending.pop_front ();      return 0;  }  bool zmq::xpub_t::xhas_in ()  { -    if (!pending.empty ()) -        return true; - -    //  Even if there are subscriptions in the fair-queuer they may be -    //  duplicates. Thus, we have to check by hand wheter there is any -    //  subscription available to pass upstream. -    //  First, process any (un)subscriptions from downstream. -    msg_t sub; -    sub.init (); -    while (true) { - -        //  Grab next subscription. -        pipe_t *pipe; -        int rc = fq.recvpipe (&sub, 0, &pipe); -        if (rc != 0 && errno == EAGAIN) { -            sub.close (); -            return false; -        } -        errno_assert (rc == 0); - -        //  Apply the subscription to the trie. If it's not a duplicate store -        //  it so that it can be passed to used on next recv call. -        if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) { -            pending.push_back (blob_t ((unsigned char*) sub.data (), -                sub.size ())); -            sub.close (); -            return true; -        } -    } -} - -bool zmq::xpub_t::apply_subscription (msg_t *sub_, pipe_t *pipe_) -{ -    unsigned char *data = (unsigned char*) sub_->data (); -    size_t size = sub_->size (); -    zmq_assert (size > 0 && (*data == 0 || *data == 1)); - -    if (*data == 0) -        return subscriptions.rm (data + 1, size - 1, pipe_); -    else -        return subscriptions.add (data + 1, size - 1, pipe_); +    return !pending.empty ();  }  void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, diff --git a/src/xpub.hpp b/src/xpub.hpp index b824548..c5d64b5 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -28,7 +28,6 @@  #include "array.hpp"  #include "blob.hpp"  #include "dist.hpp" -#include "fq.hpp"  namespace zmq  { @@ -53,10 +52,6 @@ namespace zmq      private: -        //  Applies the subscription to the trie. Return false if it is a -        //  duplicate. -        bool apply_subscription (class msg_t *sub_, class pipe_t *pipe_); -          //  Function to be applied to the trie to send all the subsciptions          //  upstream.          static void send_unsubscription (unsigned char *data_, size_t size_, @@ -68,9 +63,6 @@ namespace zmq          //  Distributor of messages holding the list of outbound pipes.          dist_t dist; -        //  Object to fair-queue the subscription requests. -        fq_t fq; -          //  List of pending (un)subscriptions, ie. those that were already          //  applied to the trie, but not yet received by the user.          typedef std::deque <blob_t> pending_t; | 
