diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-05-19 06:31:57 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-05-19 06:31:57 +0200 |
commit | 89783c37d2b8a7b5519eab7922b460449aa0bf3f (patch) | |
tree | 8b326b382793d4b8da4134bfde58393532fd015e | |
parent | f40ce4e500d32b4240395e09e0ce3359734f0189 (diff) |
incomplete messages can be stored in ypipe
-rw-r--r-- | src/pipe.cpp | 10 | ||||
-rw-r--r-- | src/ypipe.hpp | 46 |
2 files changed, 32 insertions, 24 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 61ddf38..f592f8c 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -162,7 +162,7 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) return false; } - pipe->write (*msg_); + pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); if (!(msg_->flags & ZMQ_MSG_MORE)) msgs_written++; return true; @@ -172,11 +172,9 @@ void zmq::writer_t::rollback () { zmq_msg_t msg; + // Remove all incomplete messages from the pipe. while (pipe->unwrite (&msg)) { - if (!(msg.flags & ZMQ_MSG_MORE)) { - pipe->write (msg); - break; - } + zmq_assert (msg.flags & ZMQ_MSG_MORE); zmq_msg_close (&msg); msgs_written--; } @@ -206,7 +204,7 @@ void zmq::writer_t::term () const unsigned char *offset = 0; msg.content = (void*) (offset + ZMQ_DELIMITER); msg.flags = 0; - pipe->write (msg); + pipe->write (msg, false); pipe->flush (); } diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 445b487..df5b3d0 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -31,7 +31,7 @@ namespace zmq // Only a single thread can read from the pipe at any specific moment. // Only a single thread can write to the pipe at any specific moment. // T is the type of the object in the queue. - // N is granularity of the pipe, i.e. how many messages are needed to + // N is granularity of the pipe, i.e. how many items are needed to // perform next memory allocation. template <typename T, int N> class ypipe_t @@ -46,7 +46,7 @@ namespace zmq // Let all the pointers to point to the terminator. // (unless pipe is dead, in which case c is set to NULL). - r = w = &queue.back (); + r = w = f = &queue.back (); c.set (&queue.back ()); } @@ -59,54 +59,61 @@ namespace zmq #pragma message disable(UNINIT) #endif - // Write an item to the pipe. Don't flush it yet. - inline void write (const T &value_) + // Write an item to the pipe. Don't flush it yet. If incomplete is + // set to true the item is assumed to be continued by items + // subsequently written to the pipe. Incomplete items are never + // flushed down the stream. + inline void write (const T &value_, bool incomplete_) { // Place the value to the queue, add new terminator element. queue.back () = value_; queue.push (); + + // Move the "flush up to here" poiter. + if (!incomplete_) + f = &queue.back (); } #ifdef ZMQ_HAVE_OPENVMS #pragma message restore #endif - // Pop an unflushed message from the pipe. Returns true is such - // message exists, false otherwise. + // Pop an incomplete item from the pipe. Returns true is such + // item exists, false otherwise. inline bool unwrite (T *value_) { - if (w == &queue.back ()) + if (f == &queue.back ()) return false; queue.unpush (); *value_ = queue.back (); return true; } - // Flush the messages into the pipe. Returns false if the reader - // thread is sleeping. In that case, caller is obliged to wake the - // reader up before using the pipe again. + // Flush all the completed items into the pipe. Returns false if + // the reader thread is sleeping. In that case, caller is obliged to + // wake the reader up before using the pipe again. inline bool flush () { // If there are no un-flushed items, do nothing. - if (w == &queue.back ()) + if (w == f) return true; - // Try to set 'c' to 'back' - if (c.cas (w, &queue.back ()) != w) { + // Try to set 'c' to 'f'. + if (c.cas (w, f) != w) { // Compare-and-swap was unseccessful because 'c' is NULL. // This means that the reader is asleep. Therefore we don't // care about thread-safeness and update c in non-atomic // manner. We'll return false to let the caller know // that reader is sleeping. - c.set (&queue.back ()); - w = &queue.back (); + c.set (f); + w = f; return false; } // Reader is alive. Nothing special to do now. Just move - // the 'first un-flushed item' pointer to the end of the queue. - w = &queue.back (); + // the 'first un-flushed item' pointer to 'f'. + w = f; return true; } @@ -125,7 +132,7 @@ namespace zmq // If there are no elements prefetched, exit. // During pipe's lifetime r should never be NULL, however, - // it can happen during pipe shutdown when messages + // it can happen during pipe shutdown when items // are being deallocated. if (&queue.front () == r || !r) return false; @@ -165,6 +172,9 @@ namespace zmq // exclusively by reader thread. T *r; + // Points to the first item to be flushed in the future. + T *f; + // The single point of contention between writer and reader thread. // Points past the last flushed item. If it is NULL, // reader is asleep. This pointer should be always accessed using |