diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-05-31 14:36:51 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-05-31 14:36:51 +0200 |
commit | a24a7c15a824bb48da38809bff9416673dc5a176 (patch) | |
tree | f952e4d06004f2ec0683fa47b5418b8b805ea799 /src | |
parent | 0b59866a84f733e5a53b0d2f32570581691747ef (diff) |
Session termination induced by socket fixed
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/pipe.cpp | 18 | ||||
-rw-r--r-- | src/pipe.hpp | 5 | ||||
-rw-r--r-- | src/session.cpp | 21 | ||||
-rw-r--r-- | src/socket_base.cpp | 4 |
4 files changed, 23 insertions, 25 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index fd7223c..26f7d85 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -301,8 +301,11 @@ void zmq::pipe_t::process_pipe_term_ack () delete this; } -void zmq::pipe_t::terminate () +void zmq::pipe_t::terminate (bool delay_) { + // Overload the value specified at pipe creation. + delay = delay_; + // If terminate was already called, we can ignore the duplicit invocation. if (state == terminated || state == double_terminated) return; @@ -321,9 +324,13 @@ void zmq::pipe_t::terminate () // There are still pending messages available, but the user calls // 'terminate'. We can act as if all the pending messages were read. - else if (state == pending) { - send_pipe_term_ack (peer); - state = terminating; + else if (state == pending && !delay) { + send_pipe_term_ack (peer); + state = terminating; + } + + // If there are pending messages still availabe, do nothing. + else if (state == pending && delay) { } // We've already got delimiter, but not term command yet. We can ignore @@ -338,8 +345,7 @@ void zmq::pipe_t::terminate () else zmq_assert (false); - // Stop inbound and outbound flow of messages. - in_active = false; + // Stop outbound flow of messages. out_active = false; // Rollback any unfinished outbound messages. diff --git a/src/pipe.hpp b/src/pipe.hpp index bf34a83..d3bf866 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -94,8 +94,9 @@ namespace zmq // Ask pipe to terminate. The termination will happen asynchronously // and user will be notified about actual deallocation by 'terminated' - // event. - void terminate (); + // event. If delay is true, the pending messages will be processed + // before actual shutdown. + void terminate (bool delay_); private: diff --git a/src/session.cpp b/src/session.cpp index c9f4fdb..8f29248 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -228,13 +228,6 @@ void zmq::session_t::process_term (int linger_) return; } - // If linger is set to zero, we can ask pipe to terminate without - // waiting for pending messages to be read. - if (linger_ == 0) { - proceed_with_term (); - return; - } - pending = true; // If there's finite linger value, delay the termination. @@ -246,6 +239,11 @@ void zmq::session_t::process_term (int linger_) has_linger_timer = true; } + // Start pipe termination process. Delay the termination till all messages + // are processed in case the linger time is non-zero. + pipe->terminate (linger_ != 0); + + // TODO: Should this go into pipe_t::terminate ? // In case there's no engine and there's only delimiter in the // pipe it wouldn't be ever read. Thus we check for it explicitly. pipe->check_read (); @@ -256,13 +254,6 @@ void zmq::session_t::proceed_with_term () // The pending phase have just ended. pending = false; - // If there's pipe attached to the session, we have to wait till it - // terminates. - if (pipe) { - register_term_acks (1); - pipe->terminate (); - } - // Continue with standard termination. own_t::process_term (0); } @@ -276,7 +267,7 @@ void zmq::session_t::timer_event (int id_) // Ask pipe to terminate even though there may be pending messages in it. zmq_assert (pipe); - proceed_with_term (); + pipe->terminate (false); } bool zmq::session_t::has_engine () diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 59e2653..2b1d8af 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -233,7 +233,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, // straight away. if (is_terminating ()) { register_term_acks (1); - pipe_->terminate (); + pipe_->terminate (false); } } @@ -740,7 +740,7 @@ void zmq::socket_base_t::process_term (int linger_) // Ask all attached pipes to terminate. for (pipes_t::size_type i = 0; i != pipes.size (); ++i) - pipes [i]->terminate (); + pipes [i]->terminate (false); register_term_acks (pipes.size ()); // Continue the termination process immediately. |