summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-10-16 10:53:29 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-10-16 10:53:29 +0200
commit0a03e86e9547fa7c221b316a5a943467adea3dfd (patch)
treef47b0cd7d3c91c59de419506f8d66c27e327e41c /src/session.cpp
parenta1474e305762d32df2b79300d124aac7fa0181c8 (diff)
ZMQ_LINGER socket option added.
1. ZMQ_LINGER option can be set/get 2. options are part of own_t base class rather than being declared separately by individual objects 3. Linger option is propagated with "term" command so that the newest value of it is used rather than the stored old one. 4. Session sets the linger timer if needed and terminates as soon as it expires. 5. Corresponding documentation updated. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp40
1 files changed, 36 insertions, 4 deletions
diff --git a/src/session.cpp b/src/session.cpp
index d5c6fdd..aae7e3c 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -28,8 +28,8 @@
zmq::session_t::session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_) :
- own_t (io_thread_),
- options (options_),
+ own_t (io_thread_, options_),
+ io_object_t (io_thread_),
in_pipe (NULL),
incomplete_in (false),
out_pipe (NULL),
@@ -39,6 +39,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
pipes_attached (false),
delimiter_processed (false),
force_terminate (false),
+ has_linger_timer (false),
state (active)
{
}
@@ -60,6 +61,12 @@ void zmq::session_t::proceed_with_term ()
zmq_assert (state == pending);
state = terminating;
+ // If there's still a pending linger timer, remove it.
+ if (has_linger_timer) {
+ cancel_timer (linger_timer_id);
+ has_linger_timer = false;
+ }
+
if (in_pipe) {
register_term_acks (1);
in_pipe->terminate ();
@@ -69,7 +76,9 @@ void zmq::session_t::proceed_with_term ()
out_pipe->terminate ();
}
- own_t::process_term ();
+ // The session has already waited for the linger period. We don't want
+ // the child objects to linger any more thus linger is set to zero.
+ own_t::process_term (0);
}
bool zmq::session_t::read (::zmq_msg_t *msg_)
@@ -271,11 +280,25 @@ void zmq::session_t::detach ()
in_pipe->check_read ();
}
-void zmq::session_t::process_term ()
+void zmq::session_t::process_term (int linger_)
{
zmq_assert (state == active);
state = pending;
+ // If linger is set to zero, we can terminate the session straight away
+ // not waiting for the pending messages to be sent.
+ if (linger_ == 0) {
+ proceed_with_term ();
+ return;
+ }
+
+ // If there's finite linger value, set up a timer.
+ if (linger_ > 0) {
+ zmq_assert (!has_linger_timer);
+ add_timer (linger_, linger_timer_id);
+ has_linger_timer = true;
+ }
+
// If there's no engine and there's only delimiter in the pipe it wouldn't
// be ever read. Thus we check for it explicitly.
if (in_pipe)
@@ -291,6 +314,15 @@ void zmq::session_t::process_term ()
proceed_with_term ();
}
+void zmq::session_t::timer_event (int id_)
+{
+ // Linger period expired. We can proceed with termination even though
+ // there are still pending messages to be sent.
+ zmq_assert (id_ == linger_timer_id);
+ has_linger_timer = false;
+ proceed_with_term ();
+}
+
bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
{
return socket->register_session (name_, session_);