summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp40
1 files changed, 36 insertions, 4 deletions
diff --git a/src/session.cpp b/src/session.cpp
index d5c6fdd..aae7e3c 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -28,8 +28,8 @@
zmq::session_t::session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_) :
- own_t (io_thread_),
- options (options_),
+ own_t (io_thread_, options_),
+ io_object_t (io_thread_),
in_pipe (NULL),
incomplete_in (false),
out_pipe (NULL),
@@ -39,6 +39,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
pipes_attached (false),
delimiter_processed (false),
force_terminate (false),
+ has_linger_timer (false),
state (active)
{
}
@@ -60,6 +61,12 @@ void zmq::session_t::proceed_with_term ()
zmq_assert (state == pending);
state = terminating;
+ // If there's still a pending linger timer, remove it.
+ if (has_linger_timer) {
+ cancel_timer (linger_timer_id);
+ has_linger_timer = false;
+ }
+
if (in_pipe) {
register_term_acks (1);
in_pipe->terminate ();
@@ -69,7 +76,9 @@ void zmq::session_t::proceed_with_term ()
out_pipe->terminate ();
}
- own_t::process_term ();
+ // The session has already waited for the linger period. We don't want
+ // the child objects to linger any more thus linger is set to zero.
+ own_t::process_term (0);
}
bool zmq::session_t::read (::zmq_msg_t *msg_)
@@ -271,11 +280,25 @@ void zmq::session_t::detach ()
in_pipe->check_read ();
}
-void zmq::session_t::process_term ()
+void zmq::session_t::process_term (int linger_)
{
zmq_assert (state == active);
state = pending;
+ // If linger is set to zero, we can terminate the session straight away
+ // not waiting for the pending messages to be sent.
+ if (linger_ == 0) {
+ proceed_with_term ();
+ return;
+ }
+
+ // If there's finite linger value, set up a timer.
+ if (linger_ > 0) {
+ zmq_assert (!has_linger_timer);
+ add_timer (linger_, linger_timer_id);
+ has_linger_timer = true;
+ }
+
// If there's no engine and there's only delimiter in the pipe it wouldn't
// be ever read. Thus we check for it explicitly.
if (in_pipe)
@@ -291,6 +314,15 @@ void zmq::session_t::process_term ()
proceed_with_term ();
}
+void zmq::session_t::timer_event (int id_)
+{
+ // Linger period expired. We can proceed with termination even though
+ // there are still pending messages to be sent.
+ zmq_assert (id_ == linger_timer_id);
+ has_linger_timer = false;
+ proceed_with_term ();
+}
+
bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
{
return socket->register_session (name_, session_);