diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/command.hpp | 1 | ||||
-rw-r--r-- | src/object.cpp | 7 | ||||
-rw-r--r-- | src/object.hpp | 4 | ||||
-rw-r--r-- | src/options.cpp | 18 | ||||
-rw-r--r-- | src/options.hpp | 3 | ||||
-rw-r--r-- | src/own.cpp | 20 | ||||
-rw-r--r-- | src/own.hpp | 8 | ||||
-rw-r--r-- | src/pair.cpp | 4 | ||||
-rw-r--r-- | src/pair.hpp | 2 | ||||
-rw-r--r-- | src/pub.cpp | 4 | ||||
-rw-r--r-- | src/pub.hpp | 2 | ||||
-rw-r--r-- | src/pull.cpp | 4 | ||||
-rw-r--r-- | src/pull.hpp | 2 | ||||
-rw-r--r-- | src/push.cpp | 4 | ||||
-rw-r--r-- | src/push.hpp | 2 | ||||
-rw-r--r-- | src/session.cpp | 40 | ||||
-rw-r--r-- | src/session.hpp | 17 | ||||
-rw-r--r-- | src/socket_base.cpp | 4 | ||||
-rw-r--r-- | src/socket_base.hpp | 6 | ||||
-rw-r--r-- | src/sub.cpp | 4 | ||||
-rw-r--r-- | src/sub.hpp | 2 | ||||
-rw-r--r-- | src/xrep.cpp | 4 | ||||
-rw-r--r-- | src/xrep.hpp | 2 | ||||
-rw-r--r-- | src/xreq.cpp | 4 | ||||
-rw-r--r-- | src/xreq.hpp | 2 | ||||
-rw-r--r-- | src/zmq_connecter.cpp | 5 | ||||
-rw-r--r-- | src/zmq_connecter.hpp | 4 | ||||
-rw-r--r-- | src/zmq_init.cpp | 3 | ||||
-rw-r--r-- | src/zmq_init.hpp | 4 | ||||
-rw-r--r-- | src/zmq_listener.cpp | 7 | ||||
-rw-r--r-- | src/zmq_listener.hpp | 6 |
31 files changed, 125 insertions, 74 deletions
diff --git a/src/command.hpp b/src/command.hpp index c64ca92..0c094b9 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -109,6 +109,7 @@ namespace zmq // Sent by socket to I/O object to start its shutdown. struct { + int linger; } term; // Sent by I/O object to the socket to acknowledge it has diff --git a/src/object.cpp b/src/object.cpp index 90c015a..fc08785 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -106,7 +106,7 @@ void zmq::object_t::process_command (command_t &cmd_) break; case command_t::term: - process_term (); + process_term (cmd_.args.term.linger); break; case command_t::term_ack: @@ -312,7 +312,7 @@ void zmq::object_t::send_term_req (own_t *destination_, send_command (cmd); } -void zmq::object_t::send_term (own_t *destination_) +void zmq::object_t::send_term (own_t *destination_, int linger_) { command_t cmd; #if defined ZMQ_MAKE_VALGRIND_HAPPY @@ -320,6 +320,7 @@ void zmq::object_t::send_term (own_t *destination_) #endif cmd.destination = destination_; cmd.type = command_t::term; + cmd.args.term.linger = linger_; send_command (cmd); } @@ -386,7 +387,7 @@ void zmq::object_t::process_term_req (own_t *object_) zmq_assert (false); } -void zmq::object_t::process_term () +void zmq::object_t::process_term (int linger_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index bc1b325..9580556 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -80,7 +80,7 @@ namespace zmq void send_pipe_term_ack (class reader_t *destination_); void send_term_req (class own_t *destination_, class own_t *object_); - void send_term (class own_t *destination_); + void send_term (class own_t *destination_, int linger_); void send_term_ack (class own_t *destination_); // These handlers can be overloaded by the derived objects. They are @@ -97,7 +97,7 @@ namespace zmq virtual void process_pipe_term (); virtual void process_pipe_term_ack (); virtual void process_term_req (class own_t *object_); - virtual void process_term (); + virtual void process_term (int linger_); virtual void process_term_ack (); // Special handler called after a command that requires a seqnum diff --git a/src/options.cpp b/src/options.cpp index e644a9b..c9e330f 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -34,6 +34,7 @@ zmq::options_t::options_t () : sndbuf (0), rcvbuf (0), type (-1), + linger (-1), requires_in (false), requires_out (false), immediate_connect (true) @@ -128,6 +129,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, } rcvbuf = *((uint64_t*) optval_); return 0; + + case ZMQ_LINGER: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + linger = *((int*) optval_); + return 0; } errno = EINVAL; @@ -138,6 +147,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { switch (option_) { + case ZMQ_LINGER: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = linger; + *optvallen_ = sizeof (int); + return 0; + case ZMQ_TYPE: if (*optvallen_ < sizeof (int)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index ea0c841..9b57ab6 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -54,6 +54,9 @@ namespace zmq // Socket type. int type; + // Linger time, in milliseconds. + int linger; + // These options are never set by the user directly. Instead they are // provided by the specific socket type. bool requires_in; diff --git a/src/own.cpp b/src/own.cpp index 12f50bf..c91650d 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -31,8 +31,9 @@ zmq::own_t::own_t (class ctx_t *parent_, uint32_t slot_) : { } -zmq::own_t::own_t (io_thread_t *io_thread_) : +zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) : object_t (io_thread_), + options (options_), terminating (false), sent_seqnum (0), processed_seqnum (0), @@ -113,16 +114,19 @@ void zmq::own_t::process_term_req (own_t *object_) owned.erase (it); register_term_acks (1); - send_term (object_); + + // Note that this object is the root of the (partial shutdown) thus, its + // value of linger is used, rather than the value stored by the children. + send_term (object_, options.linger); } void zmq::own_t::process_own (own_t *object_) { // If the object is already being shut down, new owned objects are - // immediately asked to terminate. + // immediately asked to terminate. Note that linger is set to zero. if (terminating) { register_term_acks (1); - send_term (object_); + send_term (object_, 0); return; } @@ -140,7 +144,7 @@ void zmq::own_t::terminate () // As for the root of the ownership tree, there's noone to terminate it, // so it has to terminate itself. if (!owner) { - process_term (); + process_term (options.linger); return; } @@ -148,14 +152,14 @@ void zmq::own_t::terminate () send_term_req (owner, this); } -void zmq::own_t::process_term () +void zmq::own_t::process_term (int linger_) { // Double termination should never happen. zmq_assert (!terminating); - // Send termination request to all owned objects. + // Send termination request to all owned objects. for (owned_t::iterator it = owned.begin (); it != owned.end (); it++) - send_term (*it); + send_term (*it, linger_); register_term_acks (owned.size ()); owned.clear (); diff --git a/src/own.hpp b/src/own.hpp index 6b6f7bf..18f7251 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -24,6 +24,7 @@ #include <algorithm> #include "object.hpp" +#include "options.hpp" #include "atomic_counter.hpp" #include "stdint.hpp" @@ -45,7 +46,7 @@ namespace zmq own_t (class ctx_t *parent_, uint32_t slot_); // The object is living within I/O thread. - own_t (class io_thread_t *io_thread_); + own_t (class io_thread_t *io_thread_, const options_t &options_); // When another owned object wants to send command to this object // it calls this function to let it know it should not shut down @@ -83,12 +84,15 @@ namespace zmq // Term handler is protocted rather than private so that it can // be intercepted by the derived class. This is useful to add custom // steps to the beginning of the termination process. - void process_term (); + void process_term (int linger_); // A place to hook in when phyicallal destruction of the object // is to be delayed. virtual void process_destroy (); + // Socket options associated with this object. + options_t options; + private: // Set owner of the object diff --git a/src/pair.cpp b/src/pair.cpp index 492ec55..8ee29cf 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -86,7 +86,7 @@ void zmq::pair_t::delimited (reader_t *pipe_) { } -void zmq::pair_t::process_term () +void zmq::pair_t::process_term (int linger_) { terminating = true; @@ -100,7 +100,7 @@ void zmq::pair_t::process_term () outpipe->terminate (); } - socket_base_t::process_term (); + socket_base_t::process_term (linger_); } void zmq::pair_t::activated (class reader_t *pipe_) diff --git a/src/pair.hpp b/src/pair.hpp index 030fb97..a14544f 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -56,7 +56,7 @@ namespace zmq private: // Hook into termination process. - void process_term (); + void process_term (int linger_); class reader_t *inpipe; class writer_t *outpipe; diff --git a/src/pub.cpp b/src/pub.cpp index b1a1239..6290e9a 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -56,7 +56,7 @@ void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, } } -void zmq::pub_t::process_term () +void zmq::pub_t::process_term (int linger_) { terminating = true; @@ -68,7 +68,7 @@ void zmq::pub_t::process_term () register_term_acks (pipes.size ()); // Continue with the termination immediately. - socket_base_t::process_term (); + socket_base_t::process_term (linger_); } void zmq::pub_t::activated (writer_t *pipe_) diff --git a/src/pub.hpp b/src/pub.hpp index 8ff192e..6e02be7 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -47,7 +47,7 @@ namespace zmq private: // Hook into the termination process. - void process_term (); + void process_term (int linger_); // Write the message to the pipe. Make the pipe inactive if writing // fails. In such a case false is returned. diff --git a/src/pull.cpp b/src/pull.cpp index c5f6a8e..5bccf06 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -42,10 +42,10 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, fq.attach (inpipe_); } -void zmq::pull_t::process_term () +void zmq::pull_t::process_term (int linger_) { fq.terminate (); - socket_base_t::process_term (); + socket_base_t::process_term (linger_); } int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) diff --git a/src/pull.hpp b/src/pull.hpp index 1b53e3b..d80bf60 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -44,7 +44,7 @@ namespace zmq private: // Hook into the termination process. - void process_term (); + void process_term (int linger_); // Fair queueing object for inbound pipes. fq_t fq; diff --git a/src/push.cpp b/src/push.cpp index f48f7bb..4f3fa5b 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -43,10 +43,10 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_, lb.attach (outpipe_); } -void zmq::push_t::process_term () +void zmq::push_t::process_term (int linger_) { lb.terminate (); - socket_base_t::process_term (); + socket_base_t::process_term (linger_); } int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) diff --git a/src/push.hpp b/src/push.hpp index 29a1a1a..ccc98f9 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -44,7 +44,7 @@ namespace zmq private: // Hook into the termination process. - void process_term (); + void process_term (int linger_); // Load balancer managing the outbound pipes. lb_t lb; diff --git a/src/session.cpp b/src/session.cpp index d5c6fdd..aae7e3c 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -28,8 +28,8 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, class socket_base_t *socket_, const options_t &options_) : - own_t (io_thread_), - options (options_), + own_t (io_thread_, options_), + io_object_t (io_thread_), in_pipe (NULL), incomplete_in (false), out_pipe (NULL), @@ -39,6 +39,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, pipes_attached (false), delimiter_processed (false), force_terminate (false), + has_linger_timer (false), state (active) { } @@ -60,6 +61,12 @@ void zmq::session_t::proceed_with_term () zmq_assert (state == pending); state = terminating; + // If there's still a pending linger timer, remove it. + if (has_linger_timer) { + cancel_timer (linger_timer_id); + has_linger_timer = false; + } + if (in_pipe) { register_term_acks (1); in_pipe->terminate (); @@ -69,7 +76,9 @@ void zmq::session_t::proceed_with_term () out_pipe->terminate (); } - own_t::process_term (); + // The session has already waited for the linger period. We don't want + // the child objects to linger any more thus linger is set to zero. + own_t::process_term (0); } bool zmq::session_t::read (::zmq_msg_t *msg_) @@ -271,11 +280,25 @@ void zmq::session_t::detach () in_pipe->check_read (); } -void zmq::session_t::process_term () +void zmq::session_t::process_term (int linger_) { zmq_assert (state == active); state = pending; + // If linger is set to zero, we can terminate the session straight away + // not waiting for the pending messages to be sent. + if (linger_ == 0) { + proceed_with_term (); + return; + } + + // If there's finite linger value, set up a timer. + if (linger_ > 0) { + zmq_assert (!has_linger_timer); + add_timer (linger_, linger_timer_id); + has_linger_timer = true; + } + // If there's no engine and there's only delimiter in the pipe it wouldn't // be ever read. Thus we check for it explicitly. if (in_pipe) @@ -291,6 +314,15 @@ void zmq::session_t::process_term () proceed_with_term (); } +void zmq::session_t::timer_event (int id_) +{ + // Linger period expired. We can proceed with termination even though + // there are still pending messages to be sent. + zmq_assert (id_ == linger_timer_id); + has_linger_timer = false; + proceed_with_term (); +} + bool zmq::session_t::register_session (const blob_t &name_, session_t *session_) { return socket->register_session (name_, session_); diff --git a/src/session.hpp b/src/session.hpp index 0f90e80..8adda5e 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -22,7 +22,7 @@ #include "own.hpp" #include "i_inout.hpp" -#include "options.hpp" +#include "io_object.hpp" #include "blob.hpp" #include "pipe.hpp" @@ -31,6 +31,7 @@ namespace zmq class session_t : public own_t, + public io_object_t, public i_inout, public i_reader_events, public i_writer_events @@ -79,16 +80,16 @@ namespace zmq ~session_t (); - // Inherited socket options. These are visible to all session classes. - options_t options; - private: // Handlers for incoming commands. void process_plug (); void process_attach (struct i_engine *engine_, const blob_t &peer_identity_); - void process_term (); + void process_term (int linger_); + + // i_poll_events handlers. + void timer_event (int id_); // Remove any half processed messages. Flush unflushed messages. // Call this function when engine disconnect to get rid of leftovers. @@ -127,6 +128,12 @@ namespace zmq // pending messages in the inbound pipe. bool force_terminate; + // ID of the linger timer + enum {linger_timer_id = 0x20}; + + // True is linger timer is running. + bool has_linger_timer; + enum { active, pending, diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2293701..c1d210d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -681,7 +681,7 @@ void zmq::socket_base_t::process_unplug () { } -void zmq::socket_base_t::process_term () +void zmq::socket_base_t::process_term (int linger_) { // Unregister all inproc endpoints associated with this socket. // Doing this we make sure that no new pipes from other sockets (inproc) @@ -689,7 +689,7 @@ void zmq::socket_base_t::process_term () unregister_endpoints (this); // Continue the termination process immediately. - own_t::process_term (); + own_t::process_term (linger_); } void zmq::socket_base_t::process_destroy () diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 4a72a1f..5d083ca 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -28,7 +28,6 @@ #include "own.hpp" #include "array.hpp" #include "mutex.hpp" -#include "options.hpp" #include "stdint.hpp" #include "atomic_counter.hpp" #include "signaler.hpp" @@ -111,13 +110,10 @@ namespace zmq virtual bool xhas_in (); virtual int xrecv (zmq_msg_t *msg_, int options_); - // Socket options. - options_t options; - // We are declaring termination handler as protected so that // individual socket types can hook into the termination process // by overloading it. - void process_term (); + void process_term (int linger_); // Delay actual destruction of the socket. void process_destroy (); diff --git a/src/sub.cpp b/src/sub.cpp index 825b350..20ffd91 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -48,10 +48,10 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, fq.attach (inpipe_); } -void zmq::sub_t::process_term () +void zmq::sub_t::process_term (int linger_) { fq.terminate (); - socket_base_t::process_term (); + socket_base_t::process_term (linger_); } int zmq::sub_t::xsetsockopt (int option_, const void *optval_, diff --git a/src/sub.hpp b/src/sub.hpp index 06a5333..45c9073 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -48,7 +48,7 @@ namespace zmq private: // Hook into the termination process. - void process_term (); + void process_term (int linger_); // Check whether the message matches at least one subscription. bool match (zmq_msg_t *msg_); diff --git a/src/xrep.cpp b/src/xrep.cpp index b0ac601..b3e8ebd 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -80,7 +80,7 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, } } -void zmq::xrep_t::process_term () +void zmq::xrep_t::process_term (int linger_) { terminating = true; @@ -93,7 +93,7 @@ void zmq::xrep_t::process_term () it++) it->second.writer->terminate (); - socket_base_t::process_term (); + socket_base_t::process_term (linger_); } void zmq::xrep_t::terminated (reader_t *pipe_) diff --git a/src/xrep.hpp b/src/xrep.hpp index 4831aaf..575fc44 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -52,7 +52,7 @@ namespace zmq private: // Hook into the termination process. - void process_term (); + void process_term (int linger_); // i_reader_events interface implementation. void activated (reader_t *pipe_); diff --git a/src/xreq.cpp b/src/xreq.cpp index 9f4eb92..017f127 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -44,11 +44,11 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, lb.attach (outpipe_); } -void zmq::xreq_t::process_term () +void zmq::xreq_t::process_term (int linger_) { fq.terminate (); lb.terminate (); - socket_base_t::process_term (); + socket_base_t::process_term (linger_); } int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) diff --git a/src/xreq.hpp b/src/xreq.hpp index eeb349d..6dbc117 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -47,7 +47,7 @@ namespace zmq private: // Hook into the termination process. - void process_term (); + void process_term (int linger_); // Messages are fair-queued from inbound pipes. And load-balanced to // the outbound pipes. diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 46ac8b5..82c3ca1 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -36,12 +36,11 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_, class session_t *session_, const options_t &options_, const char *protocol_, const char *address_) : - own_t (io_thread_), + own_t (io_thread_, options_), io_object_t (io_thread_), handle_valid (false), wait (wait_before_connect), - session (session_), - options (options_) + session (session_) { int rc = tcp_connecter.set_address (protocol_, address_); zmq_assert (rc == 0); diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index 7a516f0..3af78cb 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -23,7 +23,6 @@ #include "own.hpp" #include "io_object.hpp" #include "tcp_connecter.hpp" -#include "options.hpp" #include "stdint.hpp" namespace zmq @@ -75,9 +74,6 @@ namespace zmq // Reference to the session we belong to. class session_t *session; - // Associated socket options. - options_t options; - zmq_connecter_t (const zmq_connecter_t&); void operator = (const zmq_connecter_t&); }; diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index ba9706c..b5c5c86 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -33,12 +33,11 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, socket_base_t *socket_, session_t *session_, fd_t fd_, const options_t &options_) : - own_t (io_thread_), + own_t (io_thread_, options_), sent (false), received (false), socket (socket_), session (session_), - options (options_), io_thread (io_thread_) { // Create the engine object for this connection. diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index 887f070..6087de9 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -25,7 +25,6 @@ #include "own.hpp" #include "fd.hpp" #include "stdint.hpp" -#include "options.hpp" #include "stdint.hpp" #include "blob.hpp" @@ -76,9 +75,6 @@ namespace zmq // Identity of the peer socket. blob_t peer_identity; - // Associated socket options. - options_t options; - // I/O thread the object is living in. It will be used to plug // the engine into the same I/O thread. class io_thread_t *io_thread; diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 78e44e6..14e5249 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -26,9 +26,8 @@ zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_, socket_base_t *socket_, const options_t &options_) : - own_t (io_thread_), + own_t (io_thread_, options_), io_object_t (io_thread_), - options (options_), socket (socket_) { } @@ -49,10 +48,10 @@ void zmq::zmq_listener_t::process_plug () set_pollin (handle); } -void zmq::zmq_listener_t::process_term () +void zmq::zmq_listener_t::process_term (int linger_) { rm_fd (handle); - own_t::process_term (); + own_t::process_term (linger_); } void zmq::zmq_listener_t::in_event () diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp index 0a5ef0f..f157cf6 100644 --- a/src/zmq_listener.hpp +++ b/src/zmq_listener.hpp @@ -23,7 +23,6 @@ #include "own.hpp" #include "io_object.hpp" #include "tcp_listener.hpp" -#include "options.hpp" #include "stdint.hpp" namespace zmq @@ -44,7 +43,7 @@ namespace zmq // Handlers for incoming commands. void process_plug (); - void process_term (); + void process_term (int linger_); // Handlers for I/O events. void in_event (); @@ -55,9 +54,6 @@ namespace zmq // Handle corresponding to the listening socket. handle_t handle; - // Associated socket options. - options_t options; - // Socket the listerner belongs to. class socket_base_t *socket; |