summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/pipe.cpp11
-rw-r--r--src/pipe.hpp3
-rw-r--r--src/ypipe.hpp24
3 files changed, 30 insertions, 8 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index f8dfcb8..e444520 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -36,6 +36,17 @@ zmq::reader_t::~reader_t ()
{
}
+bool zmq::reader_t::check_read ()
+{
+ // Check if there's an item in the pipe.
+ if (pipe->check_read ())
+ return true;
+
+ // If not, deactivate the pipe.
+ endpoint->kill (this);
+ return false;
+}
+
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
if (!pipe->read (msg_)) {
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 699e3f7..ecbce7d 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -42,6 +42,9 @@ namespace zmq
void set_endpoint (i_endpoint *endpoint_);
+ // Returns true if there is at least one message to read in the pipe.
+ bool check_read ();
+
// Reads a message to the underlying pipe.
bool read (zmq_msg_t *msg_);
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index 6c51b63..0a9b5d5 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -106,16 +106,12 @@ namespace zmq
return true;
}
- // Reads an item from the pipe. Returns false if there is no value.
- // available.
- inline bool read (T *value_)
+ // Check whether item is available for reading.
+ inline bool check_read ()
{
- // Was the value prefetched already? If so, return it.
- if (&queue.front () != r) {
- *value_ = queue.front ();
- queue.pop ();
+ // Was the value prefetched already? If so, return.
+ if (&queue.front () != r)
return true;
- }
// There's no prefetched value, so let us prefetch more values.
// (Note that D is a template parameter. Becaue of that one of
@@ -166,6 +162,18 @@ namespace zmq
}
// There was at least one value prefetched.
+ return true;
+ }
+
+ // Reads an item from the pipe. Returns false if there is no value.
+ // available.
+ inline bool read (T *value_)
+ {
+ // Try to prefetch a value.
+ if (!check_read ())
+ return false;
+
+ // There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front ();
queue.pop ();