summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/pipe.cpp13
-rw-r--r--src/pipe.hpp3
-rw-r--r--src/ypipe.hpp11
-rw-r--r--src/yqueue.hpp37
4 files changed, 63 insertions, 1 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index a8e7bb9..e738128 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -146,6 +146,19 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
// TODO: Adjust size of the pipe.
}
+void zmq::writer_t::rollback ()
+{
+ while (true) {
+ zmq_msg_t msg;
+ if (!pipe->unwrite (&msg))
+ break;
+ zmq_msg_close (&msg);
+ }
+
+ // TODO: We don't have to inform the reader side of the pipe about
+ // the event. We'll simply adjust the pipe size and keep calm.
+}
+
void zmq::writer_t::flush ()
{
if (!pipe->flush ())
diff --git a/src/pipe.hpp b/src/pipe.hpp
index a155e95..771081c 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -100,6 +100,9 @@ namespace zmq
// message cannot be written because high watermark was reached.
bool write (zmq_msg_t *msg_);
+ // Remove any unflushed messages from the pipe.
+ void rollback ();
+
// Flush the messages downsteam.
void flush ();
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index 26c50ab..225b6a7 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -78,6 +78,17 @@ namespace zmq
#pragma message restore
#endif
+ // Pop an unflushed message from the pipe. Returns true is such
+ // message exists, false otherwise.
+ inline bool unwrite (T *value_)
+ {
+ if (w == &queue.back ())
+ return false;
+ *value_ = queue.back ();
+ queue.unpush ();
+ return true;
+ }
+
// Flush the messages into the pipe. Returns false if the reader
// thread is sleeping. In that case, caller is obliged to wake the
// reader up before using the pipe again.
diff --git a/src/yqueue.hpp b/src/yqueue.hpp
index 28b5fdd..9eaceb5 100644
--- a/src/yqueue.hpp
+++ b/src/yqueue.hpp
@@ -102,20 +102,54 @@ namespace zmq
chunk_t *sc = spare_chunk.xchg (NULL);
if (sc) {
end_chunk->next = sc;
+ sc->prev = end_chunk;
} else {
end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
zmq_assert (end_chunk->next);
+ end_chunk->next->prev = end_chunk;
}
end_chunk = end_chunk->next;
end_pos = 0;
}
+ // Removes element from the back end of the queue. In other words
+ // it rollbacks last push to the queue. Take care: Caller is
+ // responsible for destroying the object being unpushed.
+ // The caller must also guarantee that the queue isn't empty when
+ // unpush is called. It cannot be done automatically as the read
+ // side of the queue can be managed by different, completely
+ // unsynchronised thread.
+ inline void unpush ()
+ {
+ // First, move 'back' one position backwards.
+ if (back_pos)
+ --back_pos;
+ else {
+ back_pos = N - 1;
+ back_chunk = back_chunk->prev;
+ }
+
+ // Now, move 'end' position backwards. Note that obsolete end chunk
+ // is not used as a spare chunk. The analysis shows that doing so
+ // would require free and atomic operation per chunk deallocated
+ // instead of a simple free.
+ if (end_pos)
+ --end_pos;
+ else {
+ end_pos = N - 1;
+ end_chunk = end_chunk->prev;
+ free (end_chunk->next);
+ end_chunk->next = NULL;
+ }
+ }
+
// Removes an element from the front end of the queue.
inline void pop ()
{
if (++ begin_pos == N) {
chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next;
+ begin_chunk->prev = NULL;
begin_pos = 0;
// 'o' has been more recently used than spare_chunk,
@@ -133,6 +167,7 @@ namespace zmq
struct chunk_t
{
T values [N];
+ chunk_t *prev;
chunk_t *next;
};
@@ -149,7 +184,7 @@ namespace zmq
// People are likely to produce and consume at similar rates. In
// this scenario holding onto the most recently freed chunk saves
- // us from having to call new/delete.
+ // us from having to call malloc/free.
atomic_ptr_t<chunk_t> spare_chunk;
// Disable copying of yqueue.