summaryrefslogtreecommitdiff
path: root/src/pipe.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-06-19 11:17:20 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-06-19 11:17:20 +0200
commit00dc0245e6aacbff247c84ac8480d3ddcabacd5a (patch)
tree989a0b59cc62709df11cfd2fc62806efbe5f6d16 /src/pipe.cpp
parent6052709c2aea5fae70d805e6033861c24b4f2521 (diff)
Race condition in pipe_t fixed.
pipe_t now correctly drops pointer to the underlying pipe when sending pipe_term_ack command. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r--src/pipe.cpp38
1 files changed, 25 insertions, 13 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 26f7d85..20c7f69 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -165,10 +165,12 @@ void zmq::pipe_t::rollback ()
{
// Remove incomplete message from the outbound pipe.
msg_t msg;
- while (outpipe->unwrite (&msg)) {
- zmq_assert (msg.flags () & msg_t::more);
- int rc = msg.close ();
- errno_assert (rc == 0);
+ if (outpipe) {
+ while (outpipe->unwrite (&msg)) {
+ zmq_assert (msg.flags () & msg_t::more);
+ int rc = msg.close ();
+ errno_assert (rc == 0);
+ }
}
}
@@ -238,6 +240,7 @@ void zmq::pipe_t::process_pipe_term ()
if (state == active) {
if (!delay) {
state = terminating;
+ outpipe = NULL;
send_pipe_term_ack (peer);
return;
}
@@ -251,6 +254,7 @@ void zmq::pipe_t::process_pipe_term ()
// term command as well, so we can move straight to terminating state.
if (state == delimited) {
state = terminating;
+ outpipe = NULL;
send_pipe_term_ack (peer);
return;
}
@@ -260,6 +264,7 @@ void zmq::pipe_t::process_pipe_term ()
// own ack.
if (state == terminated) {
state = double_terminated;
+ outpipe = NULL;
send_pipe_term_ack (peer);
return;
}
@@ -280,8 +285,10 @@ void zmq::pipe_t::process_pipe_term_ack ()
// are invalid.
if (state == terminating) ;
else if (state == double_terminated);
- else if (state == terminated)
+ else if (state == terminated) {
+ outpipe = NULL;
send_pipe_term_ack (peer);
+ }
else
zmq_assert (false);
@@ -325,6 +332,7 @@ void zmq::pipe_t::terminate (bool delay_)
// There are still pending messages available, but the user calls
// 'terminate'. We can act as if all the pending messages were read.
else if (state == pending && !delay) {
+ outpipe = NULL;
send_pipe_term_ack (peer);
state = terminating;
}
@@ -348,15 +356,18 @@ void zmq::pipe_t::terminate (bool delay_)
// Stop outbound flow of messages.
out_active = false;
- // Rollback any unfinished outbound messages.
- rollback ();
+ if (outpipe) {
- // Push delimiter into the outbound pipe. Note that watermarks are not
- // checked thus the delimiter can be written even though the pipe is full.
- msg_t msg;
- msg.init_delimiter ();
- outpipe->write (msg, false);
- flush ();
+ // Rollback any unfinished outbound messages.
+ rollback ();
+
+ // Push delimiter into the outbound pipe. Note that watermarks are not
+ // checked thus the delimiter can be written even though the pipe is full.
+ msg_t msg;
+ msg.init_delimiter ();
+ outpipe->write (msg, false);
+ flush ();
+ }
}
bool zmq::pipe_t::is_delimiter (msg_t &msg_)
@@ -400,6 +411,7 @@ void zmq::pipe_t::delimit ()
}
if (state == pending) {
+ outpipe = NULL;
send_pipe_term_ack (peer);
state = terminating;
return;