From 87a6490b39c44e8f9c521f6ccea14f800a712d3f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 25 May 2011 10:25:51 +0200 Subject: All pipe termination code moved to pipe_t Till now the code was spread over mutliple locations. Additionally, the code was made more formally correct, with explicit pipe state machine etc. Signed-off-by: Martin Sustrik --- src/session.cpp | 161 ++++++++++++++++++++++++-------------------------------- 1 file changed, 70 insertions(+), 91 deletions(-) (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp index 5ef21c7..bff452e 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -31,47 +31,35 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, io_object_t (io_thread_), pipe (NULL), incomplete_in (false), + terminating (false), engine (NULL), socket (socket_), io_thread (io_thread_), - pipe_attached (false), - delimiter_processed (false), - force_terminate (false), - has_linger_timer (false), - state (active) -{ + has_linger_timer (false) +{ } zmq::session_t::~session_t () { zmq_assert (!pipe); - if (engine) - engine->terminate (); -} - -void zmq::session_t::proceed_with_term () -{ - if (state == terminating) - return; - - zmq_assert (state == pending); - state = terminating; - // If there's still a pending linger timer, remove it. if (has_linger_timer) { cancel_timer (linger_timer_id); has_linger_timer = false; } - if (pipe) { - register_term_acks (1); - pipe->terminate (); - } + // Close the engine. + if (engine) + engine->terminate (); +} - // The session has already waited for the linger period. We don't want - // the child objects to linger any more thus linger is set to zero. - own_t::process_term (0); +void zmq::session_t::attach_pipe (pipe_t *pipe_) +{ + zmq_assert (!pipe); + zmq_assert (pipe_); + pipe = pipe_; + pipe->set_event_sink (this); } bool zmq::session_t::read (msg_t *msg_) @@ -127,37 +115,13 @@ void zmq::session_t::clean_pipes () } } -void zmq::session_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) -{ - zmq_assert (!pipe_attached); - pipe_attached = true; - - if (pipe_) { - zmq_assert (!pipe); - pipe = pipe_; - pipe->set_event_sink (this); - } - - // If we are already terminating, terminate the pipes straight away. - if (state == terminating) { - if (pipe) { - pipe->terminate (); - register_term_acks (1); - } - } -} - void zmq::session_t::terminated (pipe_t *pipe_) { + // Drop the reference to the deallocated pipe. zmq_assert (pipe == pipe_); - - // If we are in process of being closed, but still waiting for all - // pending messeges being sent, we can terminate here. - if (state == pending) - proceed_with_term (); - pipe = NULL; - if (state == terminating) + + if (terminating) unregister_term_ack (); } @@ -189,7 +153,7 @@ void zmq::session_t::process_attach (i_engine *engine_, // If we are already terminating, we destroy the engine straight away. // Note that we don't have to unplug it before deleting as it's not // yet plugged to the session. - if (state == terminating) { + if (terminating) { if (engine_) delete engine_; return; @@ -209,12 +173,8 @@ void zmq::session_t::process_attach (i_engine *engine_, return; } - // Check whether the required pipe already exists and create it - // if it does not. - if (!pipe_attached) { - zmq_assert (!pipe); - pipe_attached = true; - + // Create the pipe if it does not exist yet. + if (!pipe) { object_t *parents [2] = {this, socket}; pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.rcvhwm, options.sndhwm}; @@ -226,6 +186,7 @@ void zmq::session_t::process_attach (i_engine *engine_, pipes [0]->set_event_sink (this); // Remember the local end of the pipe. + zmq_assert (!pipe); pipe = pipes [0]; // Ask socket to plug into the remote end of the pipe. @@ -249,43 +210,45 @@ void zmq::session_t::detach () // Send the event to the derived class. detached (); - // Just in case there's only a delimiter in the inbound pipe. + // Just in case there's only a delimiter in the pipe. if (pipe) pipe->check_read (); } void zmq::session_t::process_term (int linger_) { - zmq_assert (state == active); - state = pending; - - // If linger is set to zero, we can terminate the session straight away - // not waiting for the pending messages to be sent. - if (linger_ == 0) { - proceed_with_term (); - return; + // If termination is already underway, do nothing. + if (!terminating) { + + terminating = true; + + // If the termination of the pipe happens before the term command is + // delivered there's nothing much to do. We can proceed with the + // stadard termination immediately. + if (pipe) { + + // We're going to wait till the pipe terminates. + register_term_acks (1); + + // If linger is set to zero, we can ask pipe to terminate without + // waiting for pending messages to be read. + if (linger_ == 0) + pipe->terminate (); + + // If there's finite linger value, set up a timer. + if (linger_ > 0) { + zmq_assert (!has_linger_timer); + add_timer (linger_, linger_timer_id); + has_linger_timer = true; + } + + // In case there's no engine and there's only delimiter in the pipe it + // wouldn't be ever read. Thus we check for it explicitly. + pipe->check_read (); + } } - // If there's finite linger value, set up a timer. - if (linger_ > 0) { - zmq_assert (!has_linger_timer); - add_timer (linger_, linger_timer_id); - has_linger_timer = true; - } - - // If there's no engine and there's only delimiter in the pipe it wouldn't - // be ever read. Thus we check for it explicitly. - if (pipe) - pipe->check_read (); - - // If there's no in pipe, there are no pending messages to send. - // We can proceed with the shutdown straight away. Also, if there is - // pipe, but the delimiter was already processed, we can terminate - // immediately. Alternatively, if the derived session type have - // called 'terminate' we'll finish straight away. - if (delimiter_processed || force_terminate || - (!options.immediate_connect && !pipe)) - proceed_with_term (); + own_t::process_term (0); } void zmq::session_t::timer_event (int id_) @@ -294,7 +257,10 @@ void zmq::session_t::timer_event (int id_) // there are still pending messages to be sent. zmq_assert (id_ == linger_timer_id); has_linger_timer = false; - proceed_with_term (); + + // Ask pipe to terminate even though there may be pending messages in it. + zmq_assert (pipe); + pipe->terminate (); } bool zmq::session_t::has_engine () @@ -314,6 +280,19 @@ void zmq::session_t::unregister_session (const blob_t &name_) void zmq::session_t::terminate () { - force_terminate = true; - own_t::terminate (); + // If termination process is already underway, do nothing. + if (!terminating) { + terminating = true; + + // If the pipe was already terminated, there's nothing much to do. + // If it wasn't, we'll ask it to terminate. + if (pipe) { + + register_term_acks (1); + pipe->terminate (); + } + } + + own_t::terminate (); } + -- cgit v1.2.3