diff options
| author | Martin Sustrik <sustrik@250bpm.com> | 2010-03-09 08:43:20 +0100 | 
|---|---|---|
| committer | Martin Sustrik <sustrik@250bpm.com> | 2010-03-09 08:43:20 +0100 | 
| commit | e04e2cdbbaf351eb04164bdcd293fcb8fa22a9a4 (patch) | |
| tree | 419a8ff7db0495de57716d4595dc3d235c6ad6f6 | |
| parent | 9481c69b0f60068f12aa26699588fed6a8faceec (diff) | |
rollback functionality added to pipe
| -rw-r--r-- | src/pipe.cpp | 13 | ||||
| -rw-r--r-- | src/pipe.hpp | 3 | ||||
| -rw-r--r-- | src/ypipe.hpp | 11 | ||||
| -rw-r--r-- | src/yqueue.hpp | 37 | 
4 files changed, 63 insertions, 1 deletions
| diff --git a/src/pipe.cpp b/src/pipe.cpp index a8e7bb9..e738128 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -146,6 +146,19 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)      //  TODO: Adjust size of the pipe.  } +void zmq::writer_t::rollback () +{ +    while (true) { +        zmq_msg_t msg; +        if (!pipe->unwrite (&msg)) +            break; +        zmq_msg_close (&msg); +    } + +    //  TODO: We don't have to inform the reader side of the pipe about +    //  the event. We'll simply adjust the pipe size and keep calm. +} +  void zmq::writer_t::flush ()  {      if (!pipe->flush ()) diff --git a/src/pipe.hpp b/src/pipe.hpp index a155e95..771081c 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -100,6 +100,9 @@ namespace zmq          //  message cannot be written because high watermark was reached.          bool write (zmq_msg_t *msg_); +        //  Remove any unflushed messages from the pipe. +        void rollback (); +          //  Flush the messages downsteam.          void flush (); diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 26c50ab..225b6a7 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -78,6 +78,17 @@ namespace zmq  #pragma message restore  #endif +        //  Pop an unflushed message from the pipe. Returns true is such +        //  message exists, false otherwise. +        inline bool unwrite (T *value_) +        { +            if (w == &queue.back ()) +                return false; +            *value_ = queue.back (); +            queue.unpush (); +            return true; +        } +          //  Flush the messages into the pipe. Returns false if the reader          //  thread is sleeping. In that case, caller is obliged to wake the          //  reader up before using the pipe again. diff --git a/src/yqueue.hpp b/src/yqueue.hpp index 28b5fdd..9eaceb5 100644 --- a/src/yqueue.hpp +++ b/src/yqueue.hpp @@ -102,20 +102,54 @@ namespace zmq              chunk_t *sc = spare_chunk.xchg (NULL);              if (sc) {                  end_chunk->next = sc; +                sc->prev = end_chunk;              } else {                  end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));                  zmq_assert (end_chunk->next); +                end_chunk->next->prev = end_chunk;              }              end_chunk = end_chunk->next;              end_pos = 0;          } +        //  Removes element from the back end of the queue. In other words +        //  it rollbacks last push to the queue. Take care: Caller is +        //  responsible for destroying the object being unpushed. +        //  The caller must also guarantee that the queue isn't empty when +        //  unpush is called. It cannot be done automatically as the read +        //  side of the queue can be managed by different, completely +        //  unsynchronised thread. +        inline void unpush () +        { +            //  First, move 'back' one position backwards. +            if (back_pos) +                --back_pos; +            else { +                back_pos = N - 1; +                back_chunk = back_chunk->prev; +            } + +            //  Now, move 'end' position backwards. Note that obsolete end chunk +            //  is not used as a spare chunk. The analysis shows that doing so +            //  would require free and atomic operation per chunk deallocated +            //  instead of a simple free. +            if (end_pos) +                --end_pos; +            else { +                end_pos = N - 1; +                end_chunk = end_chunk->prev; +                free (end_chunk->next); +                end_chunk->next = NULL; +            } +        } +          //  Removes an element from the front end of the queue.          inline void pop ()          {              if (++ begin_pos == N) {                  chunk_t *o = begin_chunk;                  begin_chunk = begin_chunk->next; +                begin_chunk->prev = NULL;                  begin_pos = 0;                  //  'o' has been more recently used than spare_chunk, @@ -133,6 +167,7 @@ namespace zmq          struct chunk_t          {               T values [N]; +             chunk_t *prev;               chunk_t *next;          }; @@ -149,7 +184,7 @@ namespace zmq          //  People are likely to produce and consume at similar rates.  In          //  this scenario holding onto the most recently freed chunk saves -        //  us from having to call new/delete. +        //  us from having to call malloc/free.          atomic_ptr_t<chunk_t> spare_chunk;          //  Disable copying of yqueue. | 
