summaryrefslogtreecommitdiff
path: root/src/ypipe.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ypipe.hpp')
-rw-r--r--src/ypipe.hpp46
1 files changed, 28 insertions, 18 deletions
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index 445b487..df5b3d0 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -31,7 +31,7 @@ namespace zmq
// Only a single thread can read from the pipe at any specific moment.
// Only a single thread can write to the pipe at any specific moment.
// T is the type of the object in the queue.
- // N is granularity of the pipe, i.e. how many messages are needed to
+ // N is granularity of the pipe, i.e. how many items are needed to
// perform next memory allocation.
template <typename T, int N> class ypipe_t
@@ -46,7 +46,7 @@ namespace zmq
// Let all the pointers to point to the terminator.
// (unless pipe is dead, in which case c is set to NULL).
- r = w = &queue.back ();
+ r = w = f = &queue.back ();
c.set (&queue.back ());
}
@@ -59,54 +59,61 @@ namespace zmq
#pragma message disable(UNINIT)
#endif
- // Write an item to the pipe. Don't flush it yet.
- inline void write (const T &value_)
+ // Write an item to the pipe. Don't flush it yet. If incomplete is
+ // set to true the item is assumed to be continued by items
+ // subsequently written to the pipe. Incomplete items are never
+ // flushed down the stream.
+ inline void write (const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back () = value_;
queue.push ();
+
+ // Move the "flush up to here" poiter.
+ if (!incomplete_)
+ f = &queue.back ();
}
#ifdef ZMQ_HAVE_OPENVMS
#pragma message restore
#endif
- // Pop an unflushed message from the pipe. Returns true is such
- // message exists, false otherwise.
+ // Pop an incomplete item from the pipe. Returns true is such
+ // item exists, false otherwise.
inline bool unwrite (T *value_)
{
- if (w == &queue.back ())
+ if (f == &queue.back ())
return false;
queue.unpush ();
*value_ = queue.back ();
return true;
}
- // Flush the messages into the pipe. Returns false if the reader
- // thread is sleeping. In that case, caller is obliged to wake the
- // reader up before using the pipe again.
+ // Flush all the completed items into the pipe. Returns false if
+ // the reader thread is sleeping. In that case, caller is obliged to
+ // wake the reader up before using the pipe again.
inline bool flush ()
{
// If there are no un-flushed items, do nothing.
- if (w == &queue.back ())
+ if (w == f)
return true;
- // Try to set 'c' to 'back'
- if (c.cas (w, &queue.back ()) != w) {
+ // Try to set 'c' to 'f'.
+ if (c.cas (w, f) != w) {
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
- c.set (&queue.back ());
- w = &queue.back ();
+ c.set (f);
+ w = f;
return false;
}
// Reader is alive. Nothing special to do now. Just move
- // the 'first un-flushed item' pointer to the end of the queue.
- w = &queue.back ();
+ // the 'first un-flushed item' pointer to 'f'.
+ w = f;
return true;
}
@@ -125,7 +132,7 @@ namespace zmq
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
- // it can happen during pipe shutdown when messages
+ // it can happen during pipe shutdown when items
// are being deallocated.
if (&queue.front () == r || !r)
return false;
@@ -165,6 +172,9 @@ namespace zmq
// exclusively by reader thread.
T *r;
+ // Points to the first item to be flushed in the future.
+ T *f;
+
// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using