diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/pipe.cpp | 10 | ||||
| -rw-r--r-- | src/ypipe.hpp | 46 | 
2 files changed, 32 insertions, 24 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 61ddf38..f592f8c 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -162,7 +162,7 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)          return false;      } -    pipe->write (*msg_); +    pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);      if (!(msg_->flags & ZMQ_MSG_MORE))          msgs_written++;      return true; @@ -172,11 +172,9 @@ void zmq::writer_t::rollback ()  {      zmq_msg_t msg; +    //  Remove all incomplete messages from the pipe.      while (pipe->unwrite (&msg)) { -        if (!(msg.flags & ZMQ_MSG_MORE)) { -            pipe->write (msg); -            break; -        } +        zmq_assert (msg.flags & ZMQ_MSG_MORE);          zmq_msg_close (&msg);          msgs_written--;      } @@ -206,7 +204,7 @@ void zmq::writer_t::term ()      const unsigned char *offset = 0;      msg.content = (void*) (offset + ZMQ_DELIMITER);      msg.flags = 0; -    pipe->write (msg); +    pipe->write (msg, false);      pipe->flush ();  } diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 445b487..df5b3d0 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -31,7 +31,7 @@ namespace zmq      //  Only a single thread can read from the pipe at any specific moment.      //  Only a single thread can write to the pipe at any specific moment.      //  T is the type of the object in the queue. -    //  N is granularity of the pipe, i.e. how many messages are needed to +    //  N is granularity of the pipe, i.e. how many items are needed to      //  perform next memory allocation.      template <typename T, int N> class ypipe_t @@ -46,7 +46,7 @@ namespace zmq              //  Let all the pointers to point to the terminator.              //  (unless pipe is dead, in which case c is set to NULL). -            r = w = &queue.back (); +            r = w = f = &queue.back ();              c.set (&queue.back ());          } @@ -59,54 +59,61 @@ namespace zmq  #pragma message disable(UNINIT)  #endif -        //  Write an item to the pipe.  Don't flush it yet. -        inline void write (const T &value_) +        //  Write an item to the pipe.  Don't flush it yet. If incomplete is +        //  set to true the item is assumed to be continued by items +        //  subsequently written to the pipe. Incomplete items are never +        //  flushed down the stream. +        inline void write (const T &value_, bool incomplete_)          {              //  Place the value to the queue, add new terminator element.              queue.back () = value_;              queue.push (); + +            //  Move the "flush up to here" poiter. +            if (!incomplete_) +                f = &queue.back ();          }  #ifdef ZMQ_HAVE_OPENVMS  #pragma message restore  #endif -        //  Pop an unflushed message from the pipe. Returns true is such -        //  message exists, false otherwise. +        //  Pop an incomplete item from the pipe. Returns true is such +        //  item exists, false otherwise.          inline bool unwrite (T *value_)          { -            if (w == &queue.back ()) +            if (f == &queue.back ())                  return false;              queue.unpush ();              *value_ = queue.back ();              return true;          } -        //  Flush the messages into the pipe. Returns false if the reader -        //  thread is sleeping. In that case, caller is obliged to wake the -        //  reader up before using the pipe again. +        //  Flush all the completed items into the pipe. Returns false if +        //  the reader thread is sleeping. In that case, caller is obliged to +        //  wake the reader up before using the pipe again.          inline bool flush ()          {              //  If there are no un-flushed items, do nothing. -            if (w == &queue.back ()) +            if (w == f)                  return true; -            //  Try to set 'c' to 'back' -            if (c.cas (w, &queue.back ()) != w) { +            //  Try to set 'c' to 'f'. +            if (c.cas (w, f) != w) {                  //  Compare-and-swap was unseccessful because 'c' is NULL.                  //  This means that the reader is asleep. Therefore we don't                  //  care about thread-safeness and update c in non-atomic                  //  manner. We'll return false to let the caller know                  //  that reader is sleeping. -                c.set (&queue.back ()); -                w = &queue.back (); +                c.set (f); +                w = f;                  return false;              }              //  Reader is alive. Nothing special to do now. Just move -            //  the 'first un-flushed item' pointer to the end of the queue. -            w = &queue.back (); +            //  the 'first un-flushed item' pointer to 'f'. +            w = f;              return true;          } @@ -125,7 +132,7 @@ namespace zmq              //  If there are no elements prefetched, exit.              //  During pipe's lifetime r should never be NULL, however, -            //  it can happen during pipe shutdown when messages +            //  it can happen during pipe shutdown when items              //  are being deallocated.              if (&queue.front () == r || !r)                  return false; @@ -165,6 +172,9 @@ namespace zmq          //  exclusively by reader thread.          T *r; +        //  Points to the first item to be flushed in the future. +        T *f; +          //  The single point of contention between writer and reader thread.          //  Points past the last flushed item. If it is NULL,          //  reader is asleep. This pointer should be always accessed using  | 
