diff options
-rw-r--r-- | src/pipe.cpp | 13 | ||||
-rw-r--r-- | src/pipe.hpp | 3 | ||||
-rw-r--r-- | src/ypipe.hpp | 11 | ||||
-rw-r--r-- | src/yqueue.hpp | 37 |
4 files changed, 63 insertions, 1 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index a8e7bb9..e738128 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -146,6 +146,19 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) // TODO: Adjust size of the pipe. } +void zmq::writer_t::rollback () +{ + while (true) { + zmq_msg_t msg; + if (!pipe->unwrite (&msg)) + break; + zmq_msg_close (&msg); + } + + // TODO: We don't have to inform the reader side of the pipe about + // the event. We'll simply adjust the pipe size and keep calm. +} + void zmq::writer_t::flush () { if (!pipe->flush ()) diff --git a/src/pipe.hpp b/src/pipe.hpp index a155e95..771081c 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -100,6 +100,9 @@ namespace zmq // message cannot be written because high watermark was reached. bool write (zmq_msg_t *msg_); + // Remove any unflushed messages from the pipe. + void rollback (); + // Flush the messages downsteam. void flush (); diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 26c50ab..225b6a7 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -78,6 +78,17 @@ namespace zmq #pragma message restore #endif + // Pop an unflushed message from the pipe. Returns true is such + // message exists, false otherwise. + inline bool unwrite (T *value_) + { + if (w == &queue.back ()) + return false; + *value_ = queue.back (); + queue.unpush (); + return true; + } + // Flush the messages into the pipe. Returns false if the reader // thread is sleeping. In that case, caller is obliged to wake the // reader up before using the pipe again. diff --git a/src/yqueue.hpp b/src/yqueue.hpp index 28b5fdd..9eaceb5 100644 --- a/src/yqueue.hpp +++ b/src/yqueue.hpp @@ -102,20 +102,54 @@ namespace zmq chunk_t *sc = spare_chunk.xchg (NULL); if (sc) { end_chunk->next = sc; + sc->prev = end_chunk; } else { end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t)); zmq_assert (end_chunk->next); + end_chunk->next->prev = end_chunk; } end_chunk = end_chunk->next; end_pos = 0; } + // Removes element from the back end of the queue. In other words + // it rollbacks last push to the queue. Take care: Caller is + // responsible for destroying the object being unpushed. + // The caller must also guarantee that the queue isn't empty when + // unpush is called. It cannot be done automatically as the read + // side of the queue can be managed by different, completely + // unsynchronised thread. + inline void unpush () + { + // First, move 'back' one position backwards. + if (back_pos) + --back_pos; + else { + back_pos = N - 1; + back_chunk = back_chunk->prev; + } + + // Now, move 'end' position backwards. Note that obsolete end chunk + // is not used as a spare chunk. The analysis shows that doing so + // would require free and atomic operation per chunk deallocated + // instead of a simple free. + if (end_pos) + --end_pos; + else { + end_pos = N - 1; + end_chunk = end_chunk->prev; + free (end_chunk->next); + end_chunk->next = NULL; + } + } + // Removes an element from the front end of the queue. inline void pop () { if (++ begin_pos == N) { chunk_t *o = begin_chunk; begin_chunk = begin_chunk->next; + begin_chunk->prev = NULL; begin_pos = 0; // 'o' has been more recently used than spare_chunk, @@ -133,6 +167,7 @@ namespace zmq struct chunk_t { T values [N]; + chunk_t *prev; chunk_t *next; }; @@ -149,7 +184,7 @@ namespace zmq // People are likely to produce and consume at similar rates. In // this scenario holding onto the most recently freed chunk saves - // us from having to call new/delete. + // us from having to call malloc/free. atomic_ptr_t<chunk_t> spare_chunk; // Disable copying of yqueue. |