From 718885fdcd7af797f940078ca8c22aebab93c8bb Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 26 May 2011 11:30:25 +0200 Subject: Pending messages are delivered even if connection doesn't exist yet Bug in previous refactoring fixed. Signed-off-by: Martin Sustrik --- src/session.cpp | 110 ++++++++++++++++++++++++++------------------------------ 1 file changed, 51 insertions(+), 59 deletions(-) (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp index bff452e..5601402 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -31,7 +31,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, io_object_t (io_thread_), pipe (NULL), incomplete_in (false), - terminating (false), + pending (false), engine (NULL), socket (socket_), io_thread (io_thread_), @@ -121,8 +121,11 @@ void zmq::session_t::terminated (pipe_t *pipe_) zmq_assert (pipe == pipe_); pipe = NULL; - if (terminating) - unregister_term_ack (); + // If we are waiting for pending messages to be sent, at this point + // we are sure that there will be no more messages and we can proceed + // with termination safely. + if (pending) + proceed_with_term (); } void zmq::session_t::read_activated (pipe_t *pipe_) @@ -150,15 +153,6 @@ void zmq::session_t::process_plug () void zmq::session_t::process_attach (i_engine *engine_, const blob_t &peer_identity_) { - // If we are already terminating, we destroy the engine straight away. - // Note that we don't have to unplug it before deleting as it's not - // yet plugged to the session. - if (terminating) { - if (engine_) - delete engine_; - return; - } - // If some other object (e.g. init) notifies us that the connection failed // without creating an engine we need to start the reconnection process. if (!engine_) { @@ -217,37 +211,52 @@ void zmq::session_t::detach () void zmq::session_t::process_term (int linger_) { - // If termination is already underway, do nothing. - if (!terminating) { - - terminating = true; - - // If the termination of the pipe happens before the term command is - // delivered there's nothing much to do. We can proceed with the - // stadard termination immediately. - if (pipe) { - - // We're going to wait till the pipe terminates. - register_term_acks (1); - - // If linger is set to zero, we can ask pipe to terminate without - // waiting for pending messages to be read. - if (linger_ == 0) - pipe->terminate (); - - // If there's finite linger value, set up a timer. - if (linger_ > 0) { - zmq_assert (!has_linger_timer); - add_timer (linger_, linger_timer_id); - has_linger_timer = true; - } - - // In case there's no engine and there's only delimiter in the pipe it - // wouldn't be ever read. Thus we check for it explicitly. - pipe->check_read (); - } + zmq_assert (!pending); + + // If the termination of the pipe happens before the term command is + // delivered there's nothing much to do. We can proceed with the + // stadard termination immediately. + if (!pipe) { + proceed_with_term (); + return; + } + + // If linger is set to zero, we can ask pipe to terminate without + // waiting for pending messages to be read. + if (linger_ == 0) { + proceed_with_term (); + return; + } + + pending = true; + + // If there's finite linger value, delay the termination. + // If linger is infinite (negative) we don't even have to set + // the timer. + if (linger_ > 0) { + zmq_assert (!has_linger_timer); + add_timer (linger_, linger_timer_id); + has_linger_timer = true; } + // In case there's no engine and there's only delimiter in the + // pipe it wouldn't be ever read. Thus we check for it explicitly. + pipe->check_read (); +} + +void zmq::session_t::proceed_with_term () +{ + // The pending phase have just ended. + pending = false; + + // If there's pipe attached to the session, we have to wait till it + // terminates. + if (pipe) { + register_term_acks (1); + pipe->terminate (); + } + + // Continue with standard termination. own_t::process_term (0); } @@ -260,7 +269,7 @@ void zmq::session_t::timer_event (int id_) // Ask pipe to terminate even though there may be pending messages in it. zmq_assert (pipe); - pipe->terminate (); + proceed_with_term (); } bool zmq::session_t::has_engine () @@ -278,21 +287,4 @@ void zmq::session_t::unregister_session (const blob_t &name_) socket->unregister_session (name_); } -void zmq::session_t::terminate () -{ - // If termination process is already underway, do nothing. - if (!terminating) { - terminating = true; - - // If the pipe was already terminated, there's nothing much to do. - // If it wasn't, we'll ask it to terminate. - if (pipe) { - - register_term_acks (1); - pipe->terminate (); - } - } - - own_t::terminate (); -} -- cgit v1.2.3