diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-06-19 11:17:20 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-06-19 11:17:20 +0200 |
commit | 00dc0245e6aacbff247c84ac8480d3ddcabacd5a (patch) | |
tree | 989a0b59cc62709df11cfd2fc62806efbe5f6d16 | |
parent | 6052709c2aea5fae70d805e6033861c24b4f2521 (diff) |
Race condition in pipe_t fixed.
pipe_t now correctly drops pointer to the underlying pipe when
sending pipe_term_ack command.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | src/pipe.cpp | 38 |
1 files changed, 25 insertions, 13 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 26f7d85..20c7f69 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -165,10 +165,12 @@ void zmq::pipe_t::rollback () { // Remove incomplete message from the outbound pipe. msg_t msg; - while (outpipe->unwrite (&msg)) { - zmq_assert (msg.flags () & msg_t::more); - int rc = msg.close (); - errno_assert (rc == 0); + if (outpipe) { + while (outpipe->unwrite (&msg)) { + zmq_assert (msg.flags () & msg_t::more); + int rc = msg.close (); + errno_assert (rc == 0); + } } } @@ -238,6 +240,7 @@ void zmq::pipe_t::process_pipe_term () if (state == active) { if (!delay) { state = terminating; + outpipe = NULL; send_pipe_term_ack (peer); return; } @@ -251,6 +254,7 @@ void zmq::pipe_t::process_pipe_term () // term command as well, so we can move straight to terminating state. if (state == delimited) { state = terminating; + outpipe = NULL; send_pipe_term_ack (peer); return; } @@ -260,6 +264,7 @@ void zmq::pipe_t::process_pipe_term () // own ack. if (state == terminated) { state = double_terminated; + outpipe = NULL; send_pipe_term_ack (peer); return; } @@ -280,8 +285,10 @@ void zmq::pipe_t::process_pipe_term_ack () // are invalid. if (state == terminating) ; else if (state == double_terminated); - else if (state == terminated) + else if (state == terminated) { + outpipe = NULL; send_pipe_term_ack (peer); + } else zmq_assert (false); @@ -325,6 +332,7 @@ void zmq::pipe_t::terminate (bool delay_) // There are still pending messages available, but the user calls // 'terminate'. We can act as if all the pending messages were read. else if (state == pending && !delay) { + outpipe = NULL; send_pipe_term_ack (peer); state = terminating; } @@ -348,15 +356,18 @@ void zmq::pipe_t::terminate (bool delay_) // Stop outbound flow of messages. out_active = false; - // Rollback any unfinished outbound messages. - rollback (); + if (outpipe) { - // Push delimiter into the outbound pipe. Note that watermarks are not - // checked thus the delimiter can be written even though the pipe is full. - msg_t msg; - msg.init_delimiter (); - outpipe->write (msg, false); - flush (); + // Rollback any unfinished outbound messages. + rollback (); + + // Push delimiter into the outbound pipe. Note that watermarks are not + // checked thus the delimiter can be written even though the pipe is full. + msg_t msg; + msg.init_delimiter (); + outpipe->write (msg, false); + flush (); + } } bool zmq::pipe_t::is_delimiter (msg_t &msg_) @@ -400,6 +411,7 @@ void zmq::pipe_t::delimit () } if (state == pending) { + outpipe = NULL; send_pipe_term_ack (peer); state = terminating; return; |