From 87a6490b39c44e8f9c521f6ccea14f800a712d3f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 25 May 2011 10:25:51 +0200 Subject: All pipe termination code moved to pipe_t Till now the code was spread over mutliple locations. Additionally, the code was made more formally correct, with explicit pipe state machine etc. Signed-off-by: Martin Sustrik --- src/pipe.cpp | 147 ++++++++++++++++++++++++++++++++++------------- src/pipe.hpp | 29 +++++++--- src/session.cpp | 161 +++++++++++++++++++++++----------------------------- src/session.hpp | 27 +++------ src/socket_base.cpp | 4 +- 5 files changed, 208 insertions(+), 160 deletions(-) (limited to 'src') diff --git a/src/pipe.cpp b/src/pipe.cpp index 3d0c0a6..73e5aae 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -62,9 +62,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peers_msgs_read (0), peer (NULL), sink (NULL), - terminating (false), - term_recvd (false), - delimited (false), + state (active), delay (delay_) { } @@ -89,7 +87,7 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) bool zmq::pipe_t::check_read () { - if (unlikely (!in_active)) + if (unlikely (!in_active || (state != active && state != pending))) return false; // Check if there's an item in the pipe. @@ -104,13 +102,7 @@ bool zmq::pipe_t::check_read () msg_t msg; bool ok = inpipe->read (&msg); zmq_assert (ok); - delimited = true; - - // If pipe_term was already received but wasn't processed because - // of pending messages, we can ack it now. - if (term_recvd) - send_pipe_term_ack (peer); - + delimit (); return false; } @@ -119,7 +111,7 @@ bool zmq::pipe_t::check_read () bool zmq::pipe_t::read (msg_t *msg_) { - if (unlikely (!in_active)) + if (unlikely (!in_active || (state != active && state != pending))) return false; if (!inpipe->read (msg_)) { @@ -129,13 +121,7 @@ bool zmq::pipe_t::read (msg_t *msg_) // If delimiter was read, start termination process of the pipe. if (msg_->is_delimiter ()) { - delimited = true; - - // If pipe_term was already received but wasn't processed because - // of pending messages, we can ack it now. - if (term_recvd) - send_pipe_term_ack (peer); - + delimit (); return false; } @@ -150,7 +136,7 @@ bool zmq::pipe_t::read (msg_t *msg_) bool zmq::pipe_t::check_write (msg_t *msg_) { - if (unlikely (!out_active)) + if (unlikely (!out_active || state != active)) return false; bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm); @@ -188,13 +174,21 @@ void zmq::pipe_t::rollback () void zmq::pipe_t::flush () { + // If terminate() was already called do nothing. + if (state == terminated && state == double_terminated) + return; + + // The peer does not exist anymore at this point. + if (state == terminating) + return; + if (!outpipe->flush ()) send_activate_read (peer); } void zmq::pipe_t::process_activate_read () { - if (!in_active && !terminating) { + if (!in_active && (state == active || state == pending)) { in_active = true; sink->read_activated (this); } @@ -205,7 +199,7 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) // Remember the peers's message sequence number. peers_msgs_read = msgs_read_; - if (!out_active && !terminating) { + if (!out_active && state == active) { out_active = true; sink->write_activated (this); } @@ -213,16 +207,41 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) void zmq::pipe_t::process_pipe_term () { - term_recvd = true; - - // We can proceed with the termination if one of the following is true: - // 1. User asked this side of pipe to terminate already. - // 2. Waiting for pending messages in not required. - // 3. Delimiter was already received. - if (terminating || !delay || delimited) { - terminating = true; + // This is the simple case of peer-induced termination. If there are no + // more pending messages to read, or if the pipe was configured to drop + // pending messages, we can move directly to the terminating state. + // Otherwise we'll hang up in pending state till all the pending messages + // are sent. + if (state == active) { + if (!delay) { + state = terminating; + send_pipe_term_ack (peer); + } + else { + state = pending; + } + return; + } + + // Delimiter happened to arrive before the term command. Now we have the + // term command as well, so we can move straight to terminating state. + if (state == delimited) { + state = terminating; + send_pipe_term_ack (peer); + return; + } + + // This is the case where both ends of the pipe are closed in parallel. + // We simply reply to the request by ack and continue waiting for our + // own ack. + if (state == terminated) { + state = double_terminated; send_pipe_term_ack (peer); + return; } + + // pipe_term is invalid in other states. + zmq_assert (false); } void zmq::pipe_t::process_pipe_term_ack () @@ -231,10 +250,16 @@ void zmq::pipe_t::process_pipe_term_ack () zmq_assert (sink); sink->terminated (this); - // If the peer haven't asked for the termination itself, we have to - // ack the ack, so that it can deallocate properly. - if (!term_recvd) + // In terminating and double_terminated states there's nothing to do. + // Simply deallocate the pipe. In terminated state we have to ack the + // peer before deallocating this side of the pipe. All the other states + // are invalid. + if (state == terminating) ; + else if (state == double_terminated); + else if (state == terminated) send_pipe_term_ack (peer); + else + zmq_assert (false); // We'll deallocate the inbound pipe, the peer will deallocate the outbound // pipe (which is an inbound pipe from its point of view). @@ -254,10 +279,40 @@ void zmq::pipe_t::process_pipe_term_ack () void zmq::pipe_t::terminate () { - // Prevent double termination. - if (terminating) + // If terminate was already called, we can ignore the duplicit invocation. + if (state == terminated || state == double_terminated) return; - terminating = true; + + // If the pipe is in the final phase of async termination, it's going to + // closed anyway. No need to do anything special here. + else if (state == terminating) + return; + + // The simple sync termination case. Ask the peer to terminate and wait + // for the ack. + else if (state == active) { + send_pipe_term (peer); + state = terminated; + } + + // There are still pending messages available, but the user calls + // 'terminate'. We can act as if all the pending messages were read. + else if (state == pending) { + send_pipe_term_ack (peer); + state = terminating; + } + + // We've already got delimiter, but not term command yet. We can ignore + // the delimiter and ack synchronously terminate as if we were in + // active state. + else if (state == delimited) { + send_pipe_term (peer); + state = terminated; + } + + // There are no other states. + else + zmq_assert (false); // Stop inbound and outbound flow of messages. in_active = false; @@ -272,9 +327,6 @@ void zmq::pipe_t::terminate () msg.init_delimiter (); outpipe->write (msg, false); flush (); - - // Start the termination handshaking. - send_pipe_term (peer); } bool zmq::pipe_t::is_delimiter (msg_t &msg_) @@ -309,3 +361,20 @@ int zmq::pipe_t::compute_lwm (int hwm_) return result; } + +void zmq::pipe_t::delimit () +{ + if (state == active) { + state = delimited; + return; + } + + if (state == pending) { + send_pipe_term_ack (peer); + state = terminating; + return; + } + + // Delimiter in any other state is invalid. + zmq_assert (false); +} diff --git a/src/pipe.hpp b/src/pipe.hpp index f821bdd..3087ab8 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -99,6 +99,9 @@ namespace zmq void process_pipe_term (); void process_pipe_term_ack (); + // Handler for delimiter read from the pipe. + void delimit (); + // Type of the underlying lock-free pipe. typedef ypipe_t upipe_t; @@ -142,15 +145,23 @@ namespace zmq // Sink to send events to. i_pipe_events *sink; - // True is 'terminate' method was called or termination request - // was received from the peer. - bool terminating; - - // True is we've already got pipe_term command from the peer. - bool term_recvd; - - // True if delimiter was already received from the peer. - bool delimited; + // State of the pipe endpoint. Active is common state before any + // termination begins. Delimited means that delimiter was read from + // pipe before term command was received. Pending means that term + // command was already received from the peer but there are still + // pending messages to read. Terminating means that all pending + // messages were already read and all we are waiting for is ack from + // the peer. Terminated means that 'terminate' was explicitly called + // by the user. Double_terminated means that user called 'terminate' + // and then we've got term command from the peer as well. + enum { + active, + delimited, + pending, + terminating, + terminated, + double_terminated + } state; // If true, we receive all the pending inbound messages before // terminating. If false, we terminate immediately when the peer diff --git a/src/session.cpp b/src/session.cpp index 5ef21c7..bff452e 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -31,47 +31,35 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, io_object_t (io_thread_), pipe (NULL), incomplete_in (false), + terminating (false), engine (NULL), socket (socket_), io_thread (io_thread_), - pipe_attached (false), - delimiter_processed (false), - force_terminate (false), - has_linger_timer (false), - state (active) -{ + has_linger_timer (false) +{ } zmq::session_t::~session_t () { zmq_assert (!pipe); - if (engine) - engine->terminate (); -} - -void zmq::session_t::proceed_with_term () -{ - if (state == terminating) - return; - - zmq_assert (state == pending); - state = terminating; - // If there's still a pending linger timer, remove it. if (has_linger_timer) { cancel_timer (linger_timer_id); has_linger_timer = false; } - if (pipe) { - register_term_acks (1); - pipe->terminate (); - } + // Close the engine. + if (engine) + engine->terminate (); +} - // The session has already waited for the linger period. We don't want - // the child objects to linger any more thus linger is set to zero. - own_t::process_term (0); +void zmq::session_t::attach_pipe (pipe_t *pipe_) +{ + zmq_assert (!pipe); + zmq_assert (pipe_); + pipe = pipe_; + pipe->set_event_sink (this); } bool zmq::session_t::read (msg_t *msg_) @@ -127,37 +115,13 @@ void zmq::session_t::clean_pipes () } } -void zmq::session_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) -{ - zmq_assert (!pipe_attached); - pipe_attached = true; - - if (pipe_) { - zmq_assert (!pipe); - pipe = pipe_; - pipe->set_event_sink (this); - } - - // If we are already terminating, terminate the pipes straight away. - if (state == terminating) { - if (pipe) { - pipe->terminate (); - register_term_acks (1); - } - } -} - void zmq::session_t::terminated (pipe_t *pipe_) { + // Drop the reference to the deallocated pipe. zmq_assert (pipe == pipe_); - - // If we are in process of being closed, but still waiting for all - // pending messeges being sent, we can terminate here. - if (state == pending) - proceed_with_term (); - pipe = NULL; - if (state == terminating) + + if (terminating) unregister_term_ack (); } @@ -189,7 +153,7 @@ void zmq::session_t::process_attach (i_engine *engine_, // If we are already terminating, we destroy the engine straight away. // Note that we don't have to unplug it before deleting as it's not // yet plugged to the session. - if (state == terminating) { + if (terminating) { if (engine_) delete engine_; return; @@ -209,12 +173,8 @@ void zmq::session_t::process_attach (i_engine *engine_, return; } - // Check whether the required pipe already exists and create it - // if it does not. - if (!pipe_attached) { - zmq_assert (!pipe); - pipe_attached = true; - + // Create the pipe if it does not exist yet. + if (!pipe) { object_t *parents [2] = {this, socket}; pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.rcvhwm, options.sndhwm}; @@ -226,6 +186,7 @@ void zmq::session_t::process_attach (i_engine *engine_, pipes [0]->set_event_sink (this); // Remember the local end of the pipe. + zmq_assert (!pipe); pipe = pipes [0]; // Ask socket to plug into the remote end of the pipe. @@ -249,43 +210,45 @@ void zmq::session_t::detach () // Send the event to the derived class. detached (); - // Just in case there's only a delimiter in the inbound pipe. + // Just in case there's only a delimiter in the pipe. if (pipe) pipe->check_read (); } void zmq::session_t::process_term (int linger_) { - zmq_assert (state == active); - state = pending; - - // If linger is set to zero, we can terminate the session straight away - // not waiting for the pending messages to be sent. - if (linger_ == 0) { - proceed_with_term (); - return; + // If termination is already underway, do nothing. + if (!terminating) { + + terminating = true; + + // If the termination of the pipe happens before the term command is + // delivered there's nothing much to do. We can proceed with the + // stadard termination immediately. + if (pipe) { + + // We're going to wait till the pipe terminates. + register_term_acks (1); + + // If linger is set to zero, we can ask pipe to terminate without + // waiting for pending messages to be read. + if (linger_ == 0) + pipe->terminate (); + + // If there's finite linger value, set up a timer. + if (linger_ > 0) { + zmq_assert (!has_linger_timer); + add_timer (linger_, linger_timer_id); + has_linger_timer = true; + } + + // In case there's no engine and there's only delimiter in the pipe it + // wouldn't be ever read. Thus we check for it explicitly. + pipe->check_read (); + } } - // If there's finite linger value, set up a timer. - if (linger_ > 0) { - zmq_assert (!has_linger_timer); - add_timer (linger_, linger_timer_id); - has_linger_timer = true; - } - - // If there's no engine and there's only delimiter in the pipe it wouldn't - // be ever read. Thus we check for it explicitly. - if (pipe) - pipe->check_read (); - - // If there's no in pipe, there are no pending messages to send. - // We can proceed with the shutdown straight away. Also, if there is - // pipe, but the delimiter was already processed, we can terminate - // immediately. Alternatively, if the derived session type have - // called 'terminate' we'll finish straight away. - if (delimiter_processed || force_terminate || - (!options.immediate_connect && !pipe)) - proceed_with_term (); + own_t::process_term (0); } void zmq::session_t::timer_event (int id_) @@ -294,7 +257,10 @@ void zmq::session_t::timer_event (int id_) // there are still pending messages to be sent. zmq_assert (id_ == linger_timer_id); has_linger_timer = false; - proceed_with_term (); + + // Ask pipe to terminate even though there may be pending messages in it. + zmq_assert (pipe); + pipe->terminate (); } bool zmq::session_t::has_engine () @@ -314,6 +280,19 @@ void zmq::session_t::unregister_session (const blob_t &name_) void zmq::session_t::terminate () { - force_terminate = true; - own_t::terminate (); + // If termination process is already underway, do nothing. + if (!terminating) { + terminating = true; + + // If the pipe was already terminated, there's nothing much to do. + // If it wasn't, we'll ask it to terminate. + if (pipe) { + + register_term_acks (1); + pipe->terminate (); + } + } + + own_t::terminate (); } + diff --git a/src/session.hpp b/src/session.hpp index 4a12d68..f1564d8 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -41,6 +41,9 @@ namespace zmq session_t (class io_thread_t *io_thread_, class socket_base_t *socket_, const options_t &options_); + // To be used once only, when creating the session. + void attach_pipe (class pipe_t *pipe_); + // i_inout interface implementation. Note that detach method is not // implemented by generic session. Different session types may handle // engine disconnection in different ways. @@ -49,8 +52,6 @@ namespace zmq void flush (); void detach (); - void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); - // i_pipe_events interface implementation. void read_activated (class pipe_t *pipe_); void write_activated (class pipe_t *pipe_); @@ -59,7 +60,7 @@ namespace zmq protected: // This function allows to shut down the session even though - // there are pending messages in the inbound pipe. + // there are messages pending. void terminate (); // Two events for the derived session type. Attached is triggered @@ -104,6 +105,10 @@ namespace zmq // is still in the in pipe. bool incomplete_in; + // If true the termination process is already underway, ie. term ack + // for the pipe was already registered etc. + bool terminating; + // The protocol I/O engine connected to the session. struct i_engine *engine; @@ -114,28 +119,12 @@ namespace zmq // the engines into the same thread. class io_thread_t *io_thread; - // If true, pipe was already attached to this session. - bool pipe_attached; - - // If true, delimiter was already read from the inbound pipe. - bool delimiter_processed; - - // If true, we should terminate the session even though there are - // pending messages in the inbound pipe. - bool force_terminate; - // ID of the linger timer enum {linger_timer_id = 0x20}; // True is linger timer is running. bool has_linger_timer; - enum { - active, - pending, - terminating - } state; - session_t (const session_t&); const session_t &operator = (const session_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fae55f2..1682c05 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -450,8 +450,8 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach local end of the pipe to the socket object. attach_pipe (pipes [0], blob_t ()); - // Attach remote end of the pipe to the session object. - session->attach_pipe (pipes [1], blob_t ()); + // Attach remote end of the pipe to the session object later on. + session->attach_pipe (pipes [1]); } // Activate the session. Make it a child of this socket. -- cgit v1.2.3