diff options
Diffstat (limited to 'src/session.cpp')
-rw-r--r-- | src/session.cpp | 40 |
1 files changed, 36 insertions, 4 deletions
diff --git a/src/session.cpp b/src/session.cpp index d5c6fdd..aae7e3c 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -28,8 +28,8 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, class socket_base_t *socket_, const options_t &options_) : - own_t (io_thread_), - options (options_), + own_t (io_thread_, options_), + io_object_t (io_thread_), in_pipe (NULL), incomplete_in (false), out_pipe (NULL), @@ -39,6 +39,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, pipes_attached (false), delimiter_processed (false), force_terminate (false), + has_linger_timer (false), state (active) { } @@ -60,6 +61,12 @@ void zmq::session_t::proceed_with_term () zmq_assert (state == pending); state = terminating; + // If there's still a pending linger timer, remove it. + if (has_linger_timer) { + cancel_timer (linger_timer_id); + has_linger_timer = false; + } + if (in_pipe) { register_term_acks (1); in_pipe->terminate (); @@ -69,7 +76,9 @@ void zmq::session_t::proceed_with_term () out_pipe->terminate (); } - own_t::process_term (); + // The session has already waited for the linger period. We don't want + // the child objects to linger any more thus linger is set to zero. + own_t::process_term (0); } bool zmq::session_t::read (::zmq_msg_t *msg_) @@ -271,11 +280,25 @@ void zmq::session_t::detach () in_pipe->check_read (); } -void zmq::session_t::process_term () +void zmq::session_t::process_term (int linger_) { zmq_assert (state == active); state = pending; + // If linger is set to zero, we can terminate the session straight away + // not waiting for the pending messages to be sent. + if (linger_ == 0) { + proceed_with_term (); + return; + } + + // If there's finite linger value, set up a timer. + if (linger_ > 0) { + zmq_assert (!has_linger_timer); + add_timer (linger_, linger_timer_id); + has_linger_timer = true; + } + // If there's no engine and there's only delimiter in the pipe it wouldn't // be ever read. Thus we check for it explicitly. if (in_pipe) @@ -291,6 +314,15 @@ void zmq::session_t::process_term () proceed_with_term (); } +void zmq::session_t::timer_event (int id_) +{ + // Linger period expired. We can proceed with termination even though + // there are still pending messages to be sent. + zmq_assert (id_ == linger_timer_id); + has_linger_timer = false; + proceed_with_term (); +} + bool zmq::session_t::register_session (const blob_t &name_, session_t *session_) { return socket->register_session (name_, session_); |