summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/pipe.cpp27
-rw-r--r--src/pipe.hpp3
-rw-r--r--src/ypipe.hpp11
3 files changed, 36 insertions, 5 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index e2c3c4d..200beb0 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -44,15 +44,32 @@ void zmq::reader_t::set_pipe (pipe_t *pipe_)
register_pipe (pipe);
}
+bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
+{
+ unsigned char *offset = 0;
+
+ return msg_.content == (void*) (offset + ZMQ_DELIMITER);
+}
+
bool zmq::reader_t::check_read ()
{
// Check if there's an item in the pipe.
- if (pipe->check_read ())
- return true;
-
// If not, deactivate the pipe.
- endpoint->kill (this);
- return false;
+ if (!pipe->check_read ()) {
+ endpoint->kill (this);
+ return false;
+ }
+
+ // If the next item in the pipe is message delimiter,
+ // initiate its termination.
+ if (pipe->probe (is_delimiter)) {
+ if (endpoint)
+ endpoint->detach_inpipe (this);
+ term ();
+ return false;
+ }
+
+ return true;
}
bool zmq::reader_t::read (zmq_msg_t *msg_)
diff --git a/src/pipe.hpp b/src/pipe.hpp
index a3516b5..ece678a 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -58,6 +58,9 @@ namespace zmq
void process_revive ();
void process_pipe_term_ack ();
+ // Returns true if the message is delimiter; false otherwise.
+ static bool is_delimiter (zmq_msg_t &msg_);
+
// The underlying pipe.
class pipe_t *pipe;
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index 2a2d725..26f021c 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -162,6 +162,17 @@ namespace zmq
return true;
}
+ // Applies the function fn to the first elemenent in the pipe
+ // and returns the value returned by the fn.
+ // The pipe mustn't be empty or the function crashes.
+ inline bool probe (bool (*fn)(T &))
+ {
+ bool rc = check_read ();
+ zmq_assert (rc);
+
+ return (*fn) (queue.front ());
+ }
+
protected:
// Allocation-efficient queue to store pipe items.