From a24a7c15a824bb48da38809bff9416673dc5a176 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 31 May 2011 14:36:51 +0200 Subject: Session termination induced by socket fixed Signed-off-by: Martin Sustrik --- src/pipe.cpp | 18 ++++++++++++------ src/pipe.hpp | 5 +++-- src/session.cpp | 21 ++++++--------------- src/socket_base.cpp | 4 ++-- 4 files changed, 23 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/src/pipe.cpp b/src/pipe.cpp index fd7223c..26f7d85 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -301,8 +301,11 @@ void zmq::pipe_t::process_pipe_term_ack () delete this; } -void zmq::pipe_t::terminate () +void zmq::pipe_t::terminate (bool delay_) { + // Overload the value specified at pipe creation. + delay = delay_; + // If terminate was already called, we can ignore the duplicit invocation. if (state == terminated || state == double_terminated) return; @@ -321,9 +324,13 @@ void zmq::pipe_t::terminate () // There are still pending messages available, but the user calls // 'terminate'. We can act as if all the pending messages were read. - else if (state == pending) { - send_pipe_term_ack (peer); - state = terminating; + else if (state == pending && !delay) { + send_pipe_term_ack (peer); + state = terminating; + } + + // If there are pending messages still availabe, do nothing. + else if (state == pending && delay) { } // We've already got delimiter, but not term command yet. We can ignore @@ -338,8 +345,7 @@ void zmq::pipe_t::terminate () else zmq_assert (false); - // Stop inbound and outbound flow of messages. - in_active = false; + // Stop outbound flow of messages. out_active = false; // Rollback any unfinished outbound messages. diff --git a/src/pipe.hpp b/src/pipe.hpp index bf34a83..d3bf866 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -94,8 +94,9 @@ namespace zmq // Ask pipe to terminate. The termination will happen asynchronously // and user will be notified about actual deallocation by 'terminated' - // event. - void terminate (); + // event. If delay is true, the pending messages will be processed + // before actual shutdown. + void terminate (bool delay_); private: diff --git a/src/session.cpp b/src/session.cpp index c9f4fdb..8f29248 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -228,13 +228,6 @@ void zmq::session_t::process_term (int linger_) return; } - // If linger is set to zero, we can ask pipe to terminate without - // waiting for pending messages to be read. - if (linger_ == 0) { - proceed_with_term (); - return; - } - pending = true; // If there's finite linger value, delay the termination. @@ -246,6 +239,11 @@ void zmq::session_t::process_term (int linger_) has_linger_timer = true; } + // Start pipe termination process. Delay the termination till all messages + // are processed in case the linger time is non-zero. + pipe->terminate (linger_ != 0); + + // TODO: Should this go into pipe_t::terminate ? // In case there's no engine and there's only delimiter in the // pipe it wouldn't be ever read. Thus we check for it explicitly. pipe->check_read (); @@ -256,13 +254,6 @@ void zmq::session_t::proceed_with_term () // The pending phase have just ended. pending = false; - // If there's pipe attached to the session, we have to wait till it - // terminates. - if (pipe) { - register_term_acks (1); - pipe->terminate (); - } - // Continue with standard termination. own_t::process_term (0); } @@ -276,7 +267,7 @@ void zmq::session_t::timer_event (int id_) // Ask pipe to terminate even though there may be pending messages in it. zmq_assert (pipe); - proceed_with_term (); + pipe->terminate (false); } bool zmq::session_t::has_engine () diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 59e2653..2b1d8af 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -233,7 +233,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, // straight away. if (is_terminating ()) { register_term_acks (1); - pipe_->terminate (); + pipe_->terminate (false); } } @@ -740,7 +740,7 @@ void zmq::socket_base_t::process_term (int linger_) // Ask all attached pipes to terminate. for (pipes_t::size_type i = 0; i != pipes.size (); ++i) - pipes [i]->terminate (); + pipes [i]->terminate (false); register_term_acks (pipes.size ()); // Continue the termination process immediately. -- cgit v1.2.3