From 235ed3a3dcffb7c658cbc9253eae9de54db24533 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 4 May 2010 10:22:16 +0200 Subject: signaler transports commands per se rather than one-bit signals --- src/ypipe.hpp | 83 ++++++++++++++--------------------------------------------- 1 file changed, 19 insertions(+), 64 deletions(-) (limited to 'src/ypipe.hpp') diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 83ae6a7..445b487 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -30,31 +30,24 @@ namespace zmq // Lock-free queue implementation. // Only a single thread can read from the pipe at any specific moment. // Only a single thread can write to the pipe at any specific moment. - // // T is the type of the object in the queue. - // If the template parameter D is set to true, it is quaranteed that - // the pipe will die in a finite time (so that you can swich to some - // other task). If D is set to false, reading from the pipe may result - // in an infinite cycle (if the pipe is continuosly fed by new elements). - // N is granularity of the pipe (how many elements have to be inserted - // till actual memory allocation is required). - - template class ypipe_t + // N is granularity of the pipe, i.e. how many messages are needed to + // perform next memory allocation. + + template class ypipe_t { public: - // Initialises the pipe. In D scenario it is created in dead state. - // Otherwise it's alive. - inline ypipe_t () : - stop (false) + // Initialises the pipe. + inline ypipe_t () { // Insert terminator element into the queue. queue.push (); - // Let all the pointers to point to the terminator + // Let all the pointers to point to the terminator. // (unless pipe is dead, in which case c is set to NULL). r = w = &queue.back (); - c.set (D ? NULL : &queue.back ()); + c.set (&queue.back ()); } // Following function (write) deliberately copies uninitialised data @@ -125,50 +118,17 @@ namespace zmq return true; // There's no prefetched value, so let us prefetch more values. - // (Note that D is a template parameter. Becaue of that one of - // the following branches will be completely optimised away - // by the compiler.) - if (D) { - - // If one prefetch was already done since last sleeping, - // don't do a new one, rather ask caller to go asleep. - if (stop) { - stop = false; - return false; - } - - // Get new items. Perform the operation in atomic fashion. - r = c.xchg (NULL); - - // If there are no elements prefetched, exit and go asleep. - // During pipe's lifetime r should never be NULL, however, - // during pipe shutdown when retrieving messages from it - // to deallocate them, this can happen. - if (&queue.front () == r || !r) { - stop = false; - return false; - } - - // We want to do only a single prefetch in D scenario - // before going asleep. Thus, we set stop variable to true - // so that we can return false next time the prefetch is - // attempted. - stop = true; - } - else { - - // Prefetching in non-D scenario is to simply retrieve the - // pointer from c in atomic fashion. If there are no - // items to prefetch, set c to NULL (using compare-and-swap). - r = c.cas (&queue.front (), NULL); - - // If there are no elements prefetched, exit. - // During pipe's lifetime r should never be NULL, however, - // it can happen during pipe shutdown when messages - // are being deallocated. - if (&queue.front () == r || !r) - return false; - } + // Prefetching is to simply retrieve the + // pointer from c in atomic fashion. If there are no + // items to prefetch, set c to NULL (using compare-and-swap). + r = c.cas (&queue.front (), NULL); + + // If there are no elements prefetched, exit. + // During pipe's lifetime r should never be NULL, however, + // it can happen during pipe shutdown when messages + // are being deallocated. + if (&queue.front () == r || !r) + return false; // There was at least one value prefetched. return true; @@ -211,11 +171,6 @@ namespace zmq // atomic operations. atomic_ptr_t c; - // Used only if 'D' template parameter is set to true. If true, - // prefetch was already done since last sleeping and the reader - // should go asleep instead of prefetching once more. - bool stop; - // Disable copying of ypipe object. ypipe_t (const ypipe_t&); void operator = (const ypipe_t&); -- cgit v1.2.3