summaryrefslogtreecommitdiff
path: root/src/ypipe.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ypipe.hpp')
-rw-r--r--src/ypipe.hpp83
1 files changed, 19 insertions, 64 deletions
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index 83ae6a7..445b487 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -30,31 +30,24 @@ namespace zmq
// Lock-free queue implementation.
// Only a single thread can read from the pipe at any specific moment.
// Only a single thread can write to the pipe at any specific moment.
- //
// T is the type of the object in the queue.
- // If the template parameter D is set to true, it is quaranteed that
- // the pipe will die in a finite time (so that you can swich to some
- // other task). If D is set to false, reading from the pipe may result
- // in an infinite cycle (if the pipe is continuosly fed by new elements).
- // N is granularity of the pipe (how many elements have to be inserted
- // till actual memory allocation is required).
-
- template <typename T, bool D, int N> class ypipe_t
+ // N is granularity of the pipe, i.e. how many messages are needed to
+ // perform next memory allocation.
+
+ template <typename T, int N> class ypipe_t
{
public:
- // Initialises the pipe. In D scenario it is created in dead state.
- // Otherwise it's alive.
- inline ypipe_t () :
- stop (false)
+ // Initialises the pipe.
+ inline ypipe_t ()
{
// Insert terminator element into the queue.
queue.push ();
- // Let all the pointers to point to the terminator
+ // Let all the pointers to point to the terminator.
// (unless pipe is dead, in which case c is set to NULL).
r = w = &queue.back ();
- c.set (D ? NULL : &queue.back ());
+ c.set (&queue.back ());
}
// Following function (write) deliberately copies uninitialised data
@@ -125,50 +118,17 @@ namespace zmq
return true;
// There's no prefetched value, so let us prefetch more values.
- // (Note that D is a template parameter. Becaue of that one of
- // the following branches will be completely optimised away
- // by the compiler.)
- if (D) {
-
- // If one prefetch was already done since last sleeping,
- // don't do a new one, rather ask caller to go asleep.
- if (stop) {
- stop = false;
- return false;
- }
-
- // Get new items. Perform the operation in atomic fashion.
- r = c.xchg (NULL);
-
- // If there are no elements prefetched, exit and go asleep.
- // During pipe's lifetime r should never be NULL, however,
- // during pipe shutdown when retrieving messages from it
- // to deallocate them, this can happen.
- if (&queue.front () == r || !r) {
- stop = false;
- return false;
- }
-
- // We want to do only a single prefetch in D scenario
- // before going asleep. Thus, we set stop variable to true
- // so that we can return false next time the prefetch is
- // attempted.
- stop = true;
- }
- else {
-
- // Prefetching in non-D scenario is to simply retrieve the
- // pointer from c in atomic fashion. If there are no
- // items to prefetch, set c to NULL (using compare-and-swap).
- r = c.cas (&queue.front (), NULL);
-
- // If there are no elements prefetched, exit.
- // During pipe's lifetime r should never be NULL, however,
- // it can happen during pipe shutdown when messages
- // are being deallocated.
- if (&queue.front () == r || !r)
- return false;
- }
+ // Prefetching is to simply retrieve the
+ // pointer from c in atomic fashion. If there are no
+ // items to prefetch, set c to NULL (using compare-and-swap).
+ r = c.cas (&queue.front (), NULL);
+
+ // If there are no elements prefetched, exit.
+ // During pipe's lifetime r should never be NULL, however,
+ // it can happen during pipe shutdown when messages
+ // are being deallocated.
+ if (&queue.front () == r || !r)
+ return false;
// There was at least one value prefetched.
return true;
@@ -211,11 +171,6 @@ namespace zmq
// atomic operations.
atomic_ptr_t <T> c;
- // Used only if 'D' template parameter is set to true. If true,
- // prefetch was already done since last sleeping and the reader
- // should go asleep instead of prefetching once more.
- bool stop;
-
// Disable copying of ypipe object.
ypipe_t (const ypipe_t&);
void operator = (const ypipe_t&);