From 87a6490b39c44e8f9c521f6ccea14f800a712d3f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 25 May 2011 10:25:51 +0200 Subject: All pipe termination code moved to pipe_t Till now the code was spread over mutliple locations. Additionally, the code was made more formally correct, with explicit pipe state machine etc. Signed-off-by: Martin Sustrik --- src/pipe.cpp | 147 +++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 108 insertions(+), 39 deletions(-) (limited to 'src/pipe.cpp') diff --git a/src/pipe.cpp b/src/pipe.cpp index 3d0c0a6..73e5aae 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -62,9 +62,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peers_msgs_read (0), peer (NULL), sink (NULL), - terminating (false), - term_recvd (false), - delimited (false), + state (active), delay (delay_) { } @@ -89,7 +87,7 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) bool zmq::pipe_t::check_read () { - if (unlikely (!in_active)) + if (unlikely (!in_active || (state != active && state != pending))) return false; // Check if there's an item in the pipe. @@ -104,13 +102,7 @@ bool zmq::pipe_t::check_read () msg_t msg; bool ok = inpipe->read (&msg); zmq_assert (ok); - delimited = true; - - // If pipe_term was already received but wasn't processed because - // of pending messages, we can ack it now. - if (term_recvd) - send_pipe_term_ack (peer); - + delimit (); return false; } @@ -119,7 +111,7 @@ bool zmq::pipe_t::check_read () bool zmq::pipe_t::read (msg_t *msg_) { - if (unlikely (!in_active)) + if (unlikely (!in_active || (state != active && state != pending))) return false; if (!inpipe->read (msg_)) { @@ -129,13 +121,7 @@ bool zmq::pipe_t::read (msg_t *msg_) // If delimiter was read, start termination process of the pipe. if (msg_->is_delimiter ()) { - delimited = true; - - // If pipe_term was already received but wasn't processed because - // of pending messages, we can ack it now. - if (term_recvd) - send_pipe_term_ack (peer); - + delimit (); return false; } @@ -150,7 +136,7 @@ bool zmq::pipe_t::read (msg_t *msg_) bool zmq::pipe_t::check_write (msg_t *msg_) { - if (unlikely (!out_active)) + if (unlikely (!out_active || state != active)) return false; bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm); @@ -188,13 +174,21 @@ void zmq::pipe_t::rollback () void zmq::pipe_t::flush () { + // If terminate() was already called do nothing. + if (state == terminated && state == double_terminated) + return; + + // The peer does not exist anymore at this point. + if (state == terminating) + return; + if (!outpipe->flush ()) send_activate_read (peer); } void zmq::pipe_t::process_activate_read () { - if (!in_active && !terminating) { + if (!in_active && (state == active || state == pending)) { in_active = true; sink->read_activated (this); } @@ -205,7 +199,7 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) // Remember the peers's message sequence number. peers_msgs_read = msgs_read_; - if (!out_active && !terminating) { + if (!out_active && state == active) { out_active = true; sink->write_activated (this); } @@ -213,16 +207,41 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) void zmq::pipe_t::process_pipe_term () { - term_recvd = true; - - // We can proceed with the termination if one of the following is true: - // 1. User asked this side of pipe to terminate already. - // 2. Waiting for pending messages in not required. - // 3. Delimiter was already received. - if (terminating || !delay || delimited) { - terminating = true; + // This is the simple case of peer-induced termination. If there are no + // more pending messages to read, or if the pipe was configured to drop + // pending messages, we can move directly to the terminating state. + // Otherwise we'll hang up in pending state till all the pending messages + // are sent. + if (state == active) { + if (!delay) { + state = terminating; + send_pipe_term_ack (peer); + } + else { + state = pending; + } + return; + } + + // Delimiter happened to arrive before the term command. Now we have the + // term command as well, so we can move straight to terminating state. + if (state == delimited) { + state = terminating; + send_pipe_term_ack (peer); + return; + } + + // This is the case where both ends of the pipe are closed in parallel. + // We simply reply to the request by ack and continue waiting for our + // own ack. + if (state == terminated) { + state = double_terminated; send_pipe_term_ack (peer); + return; } + + // pipe_term is invalid in other states. + zmq_assert (false); } void zmq::pipe_t::process_pipe_term_ack () @@ -231,10 +250,16 @@ void zmq::pipe_t::process_pipe_term_ack () zmq_assert (sink); sink->terminated (this); - // If the peer haven't asked for the termination itself, we have to - // ack the ack, so that it can deallocate properly. - if (!term_recvd) + // In terminating and double_terminated states there's nothing to do. + // Simply deallocate the pipe. In terminated state we have to ack the + // peer before deallocating this side of the pipe. All the other states + // are invalid. + if (state == terminating) ; + else if (state == double_terminated); + else if (state == terminated) send_pipe_term_ack (peer); + else + zmq_assert (false); // We'll deallocate the inbound pipe, the peer will deallocate the outbound // pipe (which is an inbound pipe from its point of view). @@ -254,10 +279,40 @@ void zmq::pipe_t::process_pipe_term_ack () void zmq::pipe_t::terminate () { - // Prevent double termination. - if (terminating) + // If terminate was already called, we can ignore the duplicit invocation. + if (state == terminated || state == double_terminated) return; - terminating = true; + + // If the pipe is in the final phase of async termination, it's going to + // closed anyway. No need to do anything special here. + else if (state == terminating) + return; + + // The simple sync termination case. Ask the peer to terminate and wait + // for the ack. + else if (state == active) { + send_pipe_term (peer); + state = terminated; + } + + // There are still pending messages available, but the user calls + // 'terminate'. We can act as if all the pending messages were read. + else if (state == pending) { + send_pipe_term_ack (peer); + state = terminating; + } + + // We've already got delimiter, but not term command yet. We can ignore + // the delimiter and ack synchronously terminate as if we were in + // active state. + else if (state == delimited) { + send_pipe_term (peer); + state = terminated; + } + + // There are no other states. + else + zmq_assert (false); // Stop inbound and outbound flow of messages. in_active = false; @@ -272,9 +327,6 @@ void zmq::pipe_t::terminate () msg.init_delimiter (); outpipe->write (msg, false); flush (); - - // Start the termination handshaking. - send_pipe_term (peer); } bool zmq::pipe_t::is_delimiter (msg_t &msg_) @@ -309,3 +361,20 @@ int zmq::pipe_t::compute_lwm (int hwm_) return result; } + +void zmq::pipe_t::delimit () +{ + if (state == active) { + state = delimited; + return; + } + + if (state == pending) { + send_pipe_term_ack (peer); + state = terminating; + return; + } + + // Delimiter in any other state is invalid. + zmq_assert (false); +} -- cgit v1.2.3