From 89783c37d2b8a7b5519eab7922b460449aa0bf3f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 19 May 2010 06:31:57 +0200 Subject: incomplete messages can be stored in ypipe --- src/ypipe.hpp | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) (limited to 'src/ypipe.hpp') diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 445b487..df5b3d0 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -31,7 +31,7 @@ namespace zmq // Only a single thread can read from the pipe at any specific moment. // Only a single thread can write to the pipe at any specific moment. // T is the type of the object in the queue. - // N is granularity of the pipe, i.e. how many messages are needed to + // N is granularity of the pipe, i.e. how many items are needed to // perform next memory allocation. template class ypipe_t @@ -46,7 +46,7 @@ namespace zmq // Let all the pointers to point to the terminator. // (unless pipe is dead, in which case c is set to NULL). - r = w = &queue.back (); + r = w = f = &queue.back (); c.set (&queue.back ()); } @@ -59,54 +59,61 @@ namespace zmq #pragma message disable(UNINIT) #endif - // Write an item to the pipe. Don't flush it yet. - inline void write (const T &value_) + // Write an item to the pipe. Don't flush it yet. If incomplete is + // set to true the item is assumed to be continued by items + // subsequently written to the pipe. Incomplete items are never + // flushed down the stream. + inline void write (const T &value_, bool incomplete_) { // Place the value to the queue, add new terminator element. queue.back () = value_; queue.push (); + + // Move the "flush up to here" poiter. + if (!incomplete_) + f = &queue.back (); } #ifdef ZMQ_HAVE_OPENVMS #pragma message restore #endif - // Pop an unflushed message from the pipe. Returns true is such - // message exists, false otherwise. + // Pop an incomplete item from the pipe. Returns true is such + // item exists, false otherwise. inline bool unwrite (T *value_) { - if (w == &queue.back ()) + if (f == &queue.back ()) return false; queue.unpush (); *value_ = queue.back (); return true; } - // Flush the messages into the pipe. Returns false if the reader - // thread is sleeping. In that case, caller is obliged to wake the - // reader up before using the pipe again. + // Flush all the completed items into the pipe. Returns false if + // the reader thread is sleeping. In that case, caller is obliged to + // wake the reader up before using the pipe again. inline bool flush () { // If there are no un-flushed items, do nothing. - if (w == &queue.back ()) + if (w == f) return true; - // Try to set 'c' to 'back' - if (c.cas (w, &queue.back ()) != w) { + // Try to set 'c' to 'f'. + if (c.cas (w, f) != w) { // Compare-and-swap was unseccessful because 'c' is NULL. // This means that the reader is asleep. Therefore we don't // care about thread-safeness and update c in non-atomic // manner. We'll return false to let the caller know // that reader is sleeping. - c.set (&queue.back ()); - w = &queue.back (); + c.set (f); + w = f; return false; } // Reader is alive. Nothing special to do now. Just move - // the 'first un-flushed item' pointer to the end of the queue. - w = &queue.back (); + // the 'first un-flushed item' pointer to 'f'. + w = f; return true; } @@ -125,7 +132,7 @@ namespace zmq // If there are no elements prefetched, exit. // During pipe's lifetime r should never be NULL, however, - // it can happen during pipe shutdown when messages + // it can happen during pipe shutdown when items // are being deallocated. if (&queue.front () == r || !r) return false; @@ -165,6 +172,9 @@ namespace zmq // exclusively by reader thread. T *r; + // Points to the first item to be flushed in the future. + T *f; + // The single point of contention between writer and reader thread. // Points past the last flushed item. If it is NULL, // reader is asleep. This pointer should be always accessed using -- cgit v1.2.3