diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/pipe.cpp | 27 | ||||
-rw-r--r-- | src/pipe.hpp | 3 | ||||
-rw-r--r-- | src/ypipe.hpp | 11 |
3 files changed, 36 insertions, 5 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index e2c3c4d..200beb0 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -44,15 +44,32 @@ void zmq::reader_t::set_pipe (pipe_t *pipe_) register_pipe (pipe); } +bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) +{ + unsigned char *offset = 0; + + return msg_.content == (void*) (offset + ZMQ_DELIMITER); +} + bool zmq::reader_t::check_read () { // Check if there's an item in the pipe. - if (pipe->check_read ()) - return true; - // If not, deactivate the pipe. - endpoint->kill (this); - return false; + if (!pipe->check_read ()) { + endpoint->kill (this); + return false; + } + + // If the next item in the pipe is message delimiter, + // initiate its termination. + if (pipe->probe (is_delimiter)) { + if (endpoint) + endpoint->detach_inpipe (this); + term (); + return false; + } + + return true; } bool zmq::reader_t::read (zmq_msg_t *msg_) diff --git a/src/pipe.hpp b/src/pipe.hpp index a3516b5..ece678a 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -58,6 +58,9 @@ namespace zmq void process_revive (); void process_pipe_term_ack (); + // Returns true if the message is delimiter; false otherwise. + static bool is_delimiter (zmq_msg_t &msg_); + // The underlying pipe. class pipe_t *pipe; diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 2a2d725..26f021c 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -162,6 +162,17 @@ namespace zmq return true; } + // Applies the function fn to the first elemenent in the pipe + // and returns the value returned by the fn. + // The pipe mustn't be empty or the function crashes. + inline bool probe (bool (*fn)(T &)) + { + bool rc = check_read (); + zmq_assert (rc); + + return (*fn) (queue.front ()); + } + protected: // Allocation-efficient queue to store pipe items. |