diff options
| -rw-r--r-- | src/fq.cpp | 31 | ||||
| -rw-r--r-- | src/fq.hpp | 4 | ||||
| -rw-r--r-- | src/pipe.cpp | 10 | ||||
| -rw-r--r-- | src/pub.cpp | 3 | ||||
| -rw-r--r-- | src/sub.cpp | 27 | ||||
| -rw-r--r-- | src/sub.hpp | 4 | ||||
| -rw-r--r-- | src/ypipe.hpp | 2 | 
7 files changed, 69 insertions, 12 deletions
@@ -25,7 +25,8 @@  zmq::fq_t::fq_t () :      active (0), -    current (0) +    current (0), +    tbc (false)  {  } @@ -44,6 +45,8 @@ void zmq::fq_t::attach (reader_t *pipe_)  void zmq::fq_t::detach (reader_t *pipe_)  { +    zmq_assert (!tbc || pipes [current] != pipe_); +      //  Remove the pipe from the list; adjust number of active pipes      //  accordingly.      if (pipes.index (pipe_) < active) { @@ -75,14 +78,26 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)      //  Deallocate old content of the message.      zmq_msg_close (msg_); -    //  Round-robin over the pipes to get next message. +    //  Round-robin over the pipes to get the next message.      for (int count = active; count != 0; count--) { + +        //  Try to fetch new message. If we've already read part of the message +        //  subsequent part should be immediately available.          bool fetched = pipes [current]->read (msg_); -        current++; -        if (current >= active) -            current = 0; -        if (fetched) +        zmq_assert (!(tbc && !fetched)); + +        //  Note that when message is not fetched, current pipe is killed and +        //  replaced by another active pipe. Thus we don't have to increase +        //  the 'current' pointer. +        if (fetched) { +            tbc = msg_->flags & ZMQ_MSG_TBC; +            if (!tbc) { +                current++; +                if (current >= active) +                    current = 0; +            }              return 0; +        }      }      //  No message is available. Initialise the output parameter @@ -94,6 +109,10 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)  bool zmq::fq_t::has_in ()  { +    //  There are subsequent parts of the partly-read message available. +    if (tbc) +        return true; +      //  Note that messing with current doesn't break the fairness of fair      //  queueing algorithm. If there are no messages available current will      //  get back to its original value. Otherwise it'll point to the first @@ -55,6 +55,10 @@ namespace zmq          //  Index of the next bound pipe to read a message from.          pipes_t::size_type current; +        //  If true, part of a multipart message was already received, but +        //  there are following parts still waiting in the current pipe. +        bool tbc; +          fq_t (const fq_t&);          void operator = (const fq_t&);      }; diff --git a/src/pipe.cpp b/src/pipe.cpp index 14a6ef4..e844865 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -77,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)          return false;      } -    msgs_read++; +    if (!(msg_->flags & ZMQ_MSG_TBC)) +        msgs_read++; +      if (lwm > 0 && msgs_read % lwm == 0)          send_reader_info (peer, msgs_read); @@ -161,7 +163,8 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)      }      pipe->write (*msg_); -    msgs_written++; +    if (!(msg_->flags & ZMQ_MSG_TBC)) +        msgs_written++;      return true;  } @@ -194,6 +197,9 @@ void zmq::writer_t::term ()  {      endpoint = NULL; +    //  Rollback any unfinished messages. +    rollback (); +      //  Push delimiter into the pipe.      //  Trick the compiler to belive that the tag is a valid pointer.      zmq_msg_t msg; diff --git a/src/pub.cpp b/src/pub.cpp index 6f92cd2..1e86b28 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -170,7 +170,8 @@ bool zmq::pub_t::write (class writer_t *pipe_, zmq_msg_t *msg_)          pipes.swap (pipes.index (pipe_), active);          return false;      } -    pipe_->flush (); +    if (!(msg_->flags & ZMQ_MSG_TBC)) +        pipe_->flush ();      return true;  } diff --git a/src/sub.cpp b/src/sub.cpp index e32e198..fd3176f 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -26,7 +26,8 @@  zmq::sub_t::sub_t (class app_thread_t *parent_) :      socket_base_t (parent_), -    has_message (false) +    has_message (false), +    tbc (false)  {      options.requires_in = true;      options.requires_out = false; @@ -105,6 +106,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)      if (has_message) {          zmq_msg_move (msg_, &message);          has_message = false; +        tbc = msg_->flags & ZMQ_MSG_TBC;          return 0;      } @@ -122,13 +124,27 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)              return -1;          //  Check whether the message matches at least one subscription. -        if (match (msg_)) +        //  Non-initial parts of the message are passed  +        if (tbc || match (msg_)) { +            tbc = msg_->flags & ZMQ_MSG_TBC;              return 0; +        } + +        //  Message doesn't match. Pop any remaining parts of the message +        //  from the pipe. +        while (msg_->flags & ZMQ_MSG_TBC) { +            rc = fq.recv (msg_, ZMQ_NOBLOCK); +            zmq_assert (rc == 0); +        }      }  }  bool zmq::sub_t::xhas_in ()  { +    //  There are subsequent parts of the partly-read message available. +    if (tbc) +        return true; +      //  If there's already a message prepared by a previous call to zmq_poll,      //  return straight ahead.      if (has_message) @@ -153,6 +169,13 @@ bool zmq::sub_t::xhas_in ()              has_message = true;              return true;          } + +        //  Message doesn't match. Pop any remaining parts of the message +        //  from the pipe. +        while (message.flags & ZMQ_MSG_TBC) { +            rc = fq.recv (&message, ZMQ_NOBLOCK); +            zmq_assert (rc == 0); +        }      }  } diff --git a/src/sub.hpp b/src/sub.hpp index 84fab5e..d630731 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -68,6 +68,10 @@ namespace zmq          bool has_message;          zmq_msg_t message; +        //  If true, part of a multipart message was already received, but +        //  there are following parts still waiting. +        bool tbc; +          sub_t (const sub_t&);          void operator = (const sub_t&);      }; diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 225b6a7..83ae6a7 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -84,8 +84,8 @@ namespace zmq          {              if (w == &queue.back ())                  return false; -            *value_ = queue.back ();              queue.unpush (); +            *value_ = queue.back ();              return true;          }  | 
