summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-05-19 06:31:57 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-05-19 06:31:57 +0200
commit89783c37d2b8a7b5519eab7922b460449aa0bf3f (patch)
tree8b326b382793d4b8da4134bfde58393532fd015e /src
parentf40ce4e500d32b4240395e09e0ce3359734f0189 (diff)
incomplete messages can be stored in ypipe
Diffstat (limited to 'src')
-rw-r--r--src/pipe.cpp10
-rw-r--r--src/ypipe.hpp46
2 files changed, 32 insertions, 24 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 61ddf38..f592f8c 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -162,7 +162,7 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
return false;
}
- pipe->write (*msg_);
+ pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
if (!(msg_->flags & ZMQ_MSG_MORE))
msgs_written++;
return true;
@@ -172,11 +172,9 @@ void zmq::writer_t::rollback ()
{
zmq_msg_t msg;
+ // Remove all incomplete messages from the pipe.
while (pipe->unwrite (&msg)) {
- if (!(msg.flags & ZMQ_MSG_MORE)) {
- pipe->write (msg);
- break;
- }
+ zmq_assert (msg.flags & ZMQ_MSG_MORE);
zmq_msg_close (&msg);
msgs_written--;
}
@@ -206,7 +204,7 @@ void zmq::writer_t::term ()
const unsigned char *offset = 0;
msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.flags = 0;
- pipe->write (msg);
+ pipe->write (msg, false);
pipe->flush ();
}
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index 445b487..df5b3d0 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -31,7 +31,7 @@ namespace zmq
// Only a single thread can read from the pipe at any specific moment.
// Only a single thread can write to the pipe at any specific moment.
// T is the type of the object in the queue.
- // N is granularity of the pipe, i.e. how many messages are needed to
+ // N is granularity of the pipe, i.e. how many items are needed to
// perform next memory allocation.
template <typename T, int N> class ypipe_t
@@ -46,7 +46,7 @@ namespace zmq
// Let all the pointers to point to the terminator.
// (unless pipe is dead, in which case c is set to NULL).
- r = w = &queue.back ();
+ r = w = f = &queue.back ();
c.set (&queue.back ());
}
@@ -59,54 +59,61 @@ namespace zmq
#pragma message disable(UNINIT)
#endif
- // Write an item to the pipe. Don't flush it yet.
- inline void write (const T &value_)
+ // Write an item to the pipe. Don't flush it yet. If incomplete is
+ // set to true the item is assumed to be continued by items
+ // subsequently written to the pipe. Incomplete items are never
+ // flushed down the stream.
+ inline void write (const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back () = value_;
queue.push ();
+
+ // Move the "flush up to here" poiter.
+ if (!incomplete_)
+ f = &queue.back ();
}
#ifdef ZMQ_HAVE_OPENVMS
#pragma message restore
#endif
- // Pop an unflushed message from the pipe. Returns true is such
- // message exists, false otherwise.
+ // Pop an incomplete item from the pipe. Returns true is such
+ // item exists, false otherwise.
inline bool unwrite (T *value_)
{
- if (w == &queue.back ())
+ if (f == &queue.back ())
return false;
queue.unpush ();
*value_ = queue.back ();
return true;
}
- // Flush the messages into the pipe. Returns false if the reader
- // thread is sleeping. In that case, caller is obliged to wake the
- // reader up before using the pipe again.
+ // Flush all the completed items into the pipe. Returns false if
+ // the reader thread is sleeping. In that case, caller is obliged to
+ // wake the reader up before using the pipe again.
inline bool flush ()
{
// If there are no un-flushed items, do nothing.
- if (w == &queue.back ())
+ if (w == f)
return true;
- // Try to set 'c' to 'back'
- if (c.cas (w, &queue.back ()) != w) {
+ // Try to set 'c' to 'f'.
+ if (c.cas (w, f) != w) {
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
- c.set (&queue.back ());
- w = &queue.back ();
+ c.set (f);
+ w = f;
return false;
}
// Reader is alive. Nothing special to do now. Just move
- // the 'first un-flushed item' pointer to the end of the queue.
- w = &queue.back ();
+ // the 'first un-flushed item' pointer to 'f'.
+ w = f;
return true;
}
@@ -125,7 +132,7 @@ namespace zmq
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
- // it can happen during pipe shutdown when messages
+ // it can happen during pipe shutdown when items
// are being deallocated.
if (&queue.front () == r || !r)
return false;
@@ -165,6 +172,9 @@ namespace zmq
// exclusively by reader thread.
T *r;
+ // Points to the first item to be flushed in the future.
+ T *f;
+
// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using