From 7a685b0f88386b11c4c1fcbb45324aa28f4e2eac Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 8 Oct 2010 17:23:21 +0200 Subject: Clean-up of session termination process Specifically, shutdown of child objects is initiated *before* termination handshake with socket object. Signed-off-by: Martin Sustrik --- src/session.cpp | 111 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 57 insertions(+), 54 deletions(-) (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp index 4c448af..f39cf05 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -35,9 +35,9 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, engine (NULL), socket (socket_), io_thread (io_thread_), - attach_processed (false), - term_processed (false), - finalised (false) + delimiter_processed (false), + force_terminate (false), + state (active) { } @@ -50,12 +50,24 @@ zmq::session_t::~session_t () engine->terminate (); } -void zmq::session_t::terminate () +void zmq::session_t::proceed_with_term () { - if (in_pipe) + if (state == terminating) + return; + + zmq_assert (state == pending); + state = terminating; + + if (in_pipe) { + register_term_acks (1); in_pipe->terminate (); - if (out_pipe) + } + if (out_pipe) { + register_term_acks (1); out_pipe->terminate (); + } + + own_t::process_term (); } bool zmq::session_t::read (::zmq_msg_t *msg_) @@ -125,46 +137,44 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, } // If we are already terminating, terminate the pipes straight away. - if (finalised) { + if (state == terminating) { if (in_pipe) { - register_term_acks (1); in_pipe->terminate (); + register_term_acks (1); } if (out_pipe) { - register_term_acks (1); out_pipe->terminate (); + register_term_acks (1); } - return; } +} - attach_processed = true; - finalise (); +void zmq::session_t::delimited (reader_t *pipe_) +{ + zmq_assert (in_pipe == pipe_); + zmq_assert (!delimiter_processed); + delimiter_processed = true; + + // If we are in process of being closed, but still waiting for all + // pending messeges being sent, we can terminate here. + if (state == pending) + proceed_with_term (); } void zmq::session_t::terminated (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); in_pipe = NULL; - - if (finalised) { + if (state == terminating) unregister_term_ack (); - return; - } - - finalise (); } void zmq::session_t::terminated (writer_t *pipe_) { zmq_assert (out_pipe == pipe_); out_pipe = NULL; - - if (finalised) { + if (state == terminating) unregister_term_ack (); - return; - } - - finalise (); } void zmq::session_t::activated (reader_t *pipe_) @@ -186,27 +196,6 @@ void zmq::session_t::process_plug () { } -void zmq::session_t::finalise () -{ - // There may be delimiter waiting in the inbound pipe, never to be read - // because the connection cannot be established. In order to terminate - // decently in such case, do check_read which will in turn start the pipe - // termination process if there's delimiter in it. - if (in_pipe) - in_pipe->check_read (); - - // If all conditions are met, proceed with termination: - // 1. Owner object already asked us to terminate. - // 2. The pipes were already attached to the session. - // 3. Both pipes have already terminated. Note that inbound pipe - // is terminated after delimiter is read, i.e. all messages - // were already sent to the wire. - if (term_processed && attach_processed && !in_pipe && !out_pipe) { - finalised = true; - own_t::process_term (); - } -} - void zmq::session_t::process_attach (i_engine *engine_, const blob_t &peer_identity_) { @@ -219,7 +208,7 @@ void zmq::session_t::process_attach (i_engine *engine_, } // If we are already terminating, we destroy the engine straight away. - if (finalised) { + if (state == terminating) { delete engine; return; } @@ -229,18 +218,19 @@ void zmq::session_t::process_attach (i_engine *engine_, reader_t *socket_reader = NULL; writer_t *socket_writer = NULL; + // Create the pipes, if required. if (options.requires_in && !out_pipe) { create_pipe (socket, this, options.hwm, options.swap, &socket_reader, &out_pipe); out_pipe->set_event_sink (this); } - if (options.requires_out && !in_pipe) { create_pipe (this, socket, options.hwm, options.swap, &in_pipe, &socket_writer); in_pipe->set_event_sink (this); } + // Bind the pipes to the socket object. if (socket_reader || socket_writer) send_bind (socket, socket_reader, socket_writer, peer_identity_); @@ -250,8 +240,6 @@ void zmq::session_t::process_attach (i_engine *engine_, engine = engine_; engine->plug (io_thread, this); - attach_processed = true; - // Trigger the notfication about the attachment. attached (peer_identity_); } @@ -266,11 +254,21 @@ void zmq::session_t::detach () void zmq::session_t::process_term () { - // Here we are pugging into the own_t's termination mechanism. - // The goal is to postpone the termination till all the pending messages - // are sent to the peer. - term_processed = true; - finalise (); + zmq_assert (state == active); + state = pending; + + // If there's no engine and there's only delimiter in the pipe it wouldn't + // be ever read. Thus we check for it explicitly. + if (in_pipe) + in_pipe->check_read (); + + // If there's no in pipe there are no pending messages to send. + // We can proceed with the shutdown straight away. Also, if there is + // inbound pipe, but the delimiter was already processed, we can + // terminate immediately. Alternatively, if the derived session type have + // called 'terminate' we'll finish straight away. + if (!options.requires_out || delimiter_processed || force_terminate) + proceed_with_term (); } bool zmq::session_t::register_session (const blob_t &name_, session_t *session_) @@ -291,3 +289,8 @@ void zmq::session_t::detached () { } +void zmq::session_t::terminate () +{ + force_terminate = true; + own_t::terminate (); +} -- cgit v1.2.3