diff options
-rw-r--r-- | src/pipe.cpp | 11 | ||||
-rw-r--r-- | src/pipe.hpp | 3 | ||||
-rw-r--r-- | src/ypipe.hpp | 24 |
3 files changed, 30 insertions, 8 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index f8dfcb8..e444520 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -36,6 +36,17 @@ zmq::reader_t::~reader_t () { } +bool zmq::reader_t::check_read () +{ + // Check if there's an item in the pipe. + if (pipe->check_read ()) + return true; + + // If not, deactivate the pipe. + endpoint->kill (this); + return false; +} + bool zmq::reader_t::read (zmq_msg_t *msg_) { if (!pipe->read (msg_)) { diff --git a/src/pipe.hpp b/src/pipe.hpp index 699e3f7..ecbce7d 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -42,6 +42,9 @@ namespace zmq void set_endpoint (i_endpoint *endpoint_); + // Returns true if there is at least one message to read in the pipe. + bool check_read (); + // Reads a message to the underlying pipe. bool read (zmq_msg_t *msg_); diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 6c51b63..0a9b5d5 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -106,16 +106,12 @@ namespace zmq return true; } - // Reads an item from the pipe. Returns false if there is no value. - // available. - inline bool read (T *value_) + // Check whether item is available for reading. + inline bool check_read () { - // Was the value prefetched already? If so, return it. - if (&queue.front () != r) { - *value_ = queue.front (); - queue.pop (); + // Was the value prefetched already? If so, return. + if (&queue.front () != r) return true; - } // There's no prefetched value, so let us prefetch more values. // (Note that D is a template parameter. Becaue of that one of @@ -166,6 +162,18 @@ namespace zmq } // There was at least one value prefetched. + return true; + } + + // Reads an item from the pipe. Returns false if there is no value. + // available. + inline bool read (T *value_) + { + // Try to prefetch a value. + if (!check_read ()) + return false; + + // There was at least one value prefetched. // Return it to the caller. *value_ = queue.front (); queue.pop (); |