From 00dc0245e6aacbff247c84ac8480d3ddcabacd5a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 19 Jun 2011 11:17:20 +0200 Subject: Race condition in pipe_t fixed. pipe_t now correctly drops pointer to the underlying pipe when sending pipe_term_ack command. Signed-off-by: Martin Sustrik --- src/pipe.cpp | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) (limited to 'src/pipe.cpp') diff --git a/src/pipe.cpp b/src/pipe.cpp index 26f7d85..20c7f69 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -165,10 +165,12 @@ void zmq::pipe_t::rollback () { // Remove incomplete message from the outbound pipe. msg_t msg; - while (outpipe->unwrite (&msg)) { - zmq_assert (msg.flags () & msg_t::more); - int rc = msg.close (); - errno_assert (rc == 0); + if (outpipe) { + while (outpipe->unwrite (&msg)) { + zmq_assert (msg.flags () & msg_t::more); + int rc = msg.close (); + errno_assert (rc == 0); + } } } @@ -238,6 +240,7 @@ void zmq::pipe_t::process_pipe_term () if (state == active) { if (!delay) { state = terminating; + outpipe = NULL; send_pipe_term_ack (peer); return; } @@ -251,6 +254,7 @@ void zmq::pipe_t::process_pipe_term () // term command as well, so we can move straight to terminating state. if (state == delimited) { state = terminating; + outpipe = NULL; send_pipe_term_ack (peer); return; } @@ -260,6 +264,7 @@ void zmq::pipe_t::process_pipe_term () // own ack. if (state == terminated) { state = double_terminated; + outpipe = NULL; send_pipe_term_ack (peer); return; } @@ -280,8 +285,10 @@ void zmq::pipe_t::process_pipe_term_ack () // are invalid. if (state == terminating) ; else if (state == double_terminated); - else if (state == terminated) + else if (state == terminated) { + outpipe = NULL; send_pipe_term_ack (peer); + } else zmq_assert (false); @@ -325,6 +332,7 @@ void zmq::pipe_t::terminate (bool delay_) // There are still pending messages available, but the user calls // 'terminate'. We can act as if all the pending messages were read. else if (state == pending && !delay) { + outpipe = NULL; send_pipe_term_ack (peer); state = terminating; } @@ -348,15 +356,18 @@ void zmq::pipe_t::terminate (bool delay_) // Stop outbound flow of messages. out_active = false; - // Rollback any unfinished outbound messages. - rollback (); + if (outpipe) { - // Push delimiter into the outbound pipe. Note that watermarks are not - // checked thus the delimiter can be written even though the pipe is full. - msg_t msg; - msg.init_delimiter (); - outpipe->write (msg, false); - flush (); + // Rollback any unfinished outbound messages. + rollback (); + + // Push delimiter into the outbound pipe. Note that watermarks are not + // checked thus the delimiter can be written even though the pipe is full. + msg_t msg; + msg.init_delimiter (); + outpipe->write (msg, false); + flush (); + } } bool zmq::pipe_t::is_delimiter (msg_t &msg_) @@ -400,6 +411,7 @@ void zmq::pipe_t::delimit () } if (state == pending) { + outpipe = NULL; send_pipe_term_ack (peer); state = terminating; return; -- cgit v1.2.3