From 10533a560b4af1d3dae63c87c737e25bbdb78998 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Wed, 14 Jul 2010 18:31:17 +0200 Subject: pipe: check_read() should check for message delimiter --- src/pipe.cpp | 27 ++++++++++++++++++++++----- src/pipe.hpp | 3 +++ src/ypipe.hpp | 11 +++++++++++ 3 files changed, 36 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/pipe.cpp b/src/pipe.cpp index e2c3c4d..200beb0 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -44,15 +44,32 @@ void zmq::reader_t::set_pipe (pipe_t *pipe_) register_pipe (pipe); } +bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) +{ + unsigned char *offset = 0; + + return msg_.content == (void*) (offset + ZMQ_DELIMITER); +} + bool zmq::reader_t::check_read () { // Check if there's an item in the pipe. - if (pipe->check_read ()) - return true; - // If not, deactivate the pipe. - endpoint->kill (this); - return false; + if (!pipe->check_read ()) { + endpoint->kill (this); + return false; + } + + // If the next item in the pipe is message delimiter, + // initiate its termination. + if (pipe->probe (is_delimiter)) { + if (endpoint) + endpoint->detach_inpipe (this); + term (); + return false; + } + + return true; } bool zmq::reader_t::read (zmq_msg_t *msg_) diff --git a/src/pipe.hpp b/src/pipe.hpp index a3516b5..ece678a 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -58,6 +58,9 @@ namespace zmq void process_revive (); void process_pipe_term_ack (); + // Returns true if the message is delimiter; false otherwise. + static bool is_delimiter (zmq_msg_t &msg_); + // The underlying pipe. class pipe_t *pipe; diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 2a2d725..26f021c 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -162,6 +162,17 @@ namespace zmq return true; } + // Applies the function fn to the first elemenent in the pipe + // and returns the value returned by the fn. + // The pipe mustn't be empty or the function crashes. + inline bool probe (bool (*fn)(T &)) + { + bool rc = check_read (); + zmq_assert (rc); + + return (*fn) (queue.front ()); + } + protected: // Allocation-efficient queue to store pipe items. -- cgit v1.2.3