From f7f1dfc86dd649edbd789a5d157d74721338c912 Mon Sep 17 00:00:00 2001 From: Thijs Terlouw Date: Wed, 26 Jan 2011 07:01:06 +0100 Subject: ZMQ_RECONNECT_IVL_MAX socket option added It allows for exponential back-off strategy when reconnecting. Signed-off-by: Thijs Terlouw --- src/options.cpp | 22 +++++++++++++++++++++ src/options.hpp | 6 +++++- src/zmq_connecter.cpp | 55 +++++++++++++++++++++++++++++++++++---------------- src/zmq_connecter.hpp | 12 +++++++++-- 4 files changed, 75 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/options.cpp b/src/options.cpp index ae35059..c6d5760 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -37,6 +37,7 @@ zmq::options_t::options_t () : type (-1), linger (-1), reconnect_ivl (100), + reconnect_ivl_max (0), backlog (100), requires_in (false), requires_out (false), @@ -161,6 +162,18 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, reconnect_ivl = *((int*) optval_); return 0; + case ZMQ_RECONNECT_IVL_MAX: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + if (*((int*) optval_) < 0) { + errno = EINVAL; + return -1; + } + reconnect_ivl_max = *((int*) optval_); + return 0; + case ZMQ_BACKLOG: if (optvallen_ != sizeof (int)) { errno = EINVAL; @@ -297,6 +310,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case ZMQ_RECONNECT_IVL_MAX: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = reconnect_ivl_max; + *optvallen_ = sizeof (int); + return 0; + case ZMQ_BACKLOG: if (*optvallen_ < sizeof (int)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index e6df505..38c6982 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -59,8 +59,12 @@ namespace zmq // Linger time, in milliseconds. int linger; - // Interval between attempts to reconnect, in milliseconds. + // Minimum interval between attempts to reconnect, in milliseconds. + // Default 100ms int reconnect_ivl; + // Maximum interval between attempts to reconnect, in milliseconds. + // Default 0 (unused) + int reconnect_ivl_max; // Maximum backlog for pending connections. int backlog; diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 8a6d89a..d2638ac 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -40,10 +40,11 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_, io_object_t (io_thread_), handle_valid (false), wait (wait_before_connect), - session (session_) + session (session_), + current_reconnect_ivl(options.reconnect_ivl) { - int rc = tcp_connecter.set_address (protocol_, address_); - zmq_assert (rc == 0); + int rc = tcp_connecter.set_address (protocol_, address_); + zmq_assert (rc == 0); } zmq::zmq_connecter_t::~zmq_connecter_t () @@ -54,21 +55,10 @@ zmq::zmq_connecter_t::~zmq_connecter_t () rm_fd (handle); } -int zmq::zmq_connecter_t::get_reconnect_ivl () -{ -#if defined ZMQ_HAVE_WINDOWS - return (options.reconnect_ivl + (((int) GetCurrentProcessId () * 13) - % options.reconnect_ivl)); -#else - return (options.reconnect_ivl + (((int) getpid () * 13) - % options.reconnect_ivl)); -#endif -} - void zmq::zmq_connecter_t::process_plug () { if (wait) - add_timer (get_reconnect_ivl (), reconnect_timer_id); + add_reconnect_timer(); else start_connecting (); } @@ -91,7 +81,7 @@ void zmq::zmq_connecter_t::out_event () if (fd == retired_fd) { tcp_connecter.close (); wait = true; - add_timer (get_reconnect_ivl (), reconnect_timer_id); + add_reconnect_timer(); return; } @@ -140,5 +130,36 @@ void zmq::zmq_connecter_t::start_connecting () // Handle any other error condition by eventual reconnect. wait = true; - add_timer (get_reconnect_ivl (), reconnect_timer_id); + add_reconnect_timer(); +} + +void zmq::zmq_connecter_t::add_reconnect_timer() +{ + add_timer (get_new_reconnect_ivl(), reconnect_timer_id); +} + +int zmq::zmq_connecter_t::get_new_reconnect_ivl () +{ +#if defined ZMQ_HAVE_WINDOWS + int pid = (int) GetCurrentProcessId (); +#else + int pid = (int) getpid (); +#endif + + // The new interval is the current interval + random value. + int this_interval = current_reconnect_ivl + + ((pid * 13) % options.reconnect_ivl); + + // Only change the current reconnect interval if the maximum reconnect + // interval was set and if it's larger than the reconnect interval. + if (options.reconnect_ivl_max > 0 && + options.reconnect_ivl_max > options.reconnect_ivl) { + + // Calculate the next interval + current_reconnect_ivl = current_reconnect_ivl * 2; + if(current_reconnect_ivl >= options.reconnect_ivl_max) { + current_reconnect_ivl = options.reconnect_ivl_max; + } + } + return this_interval; } diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index 060f5c9..ef2cd1a 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -55,8 +55,13 @@ namespace zmq // Internal function to start the actual connection establishment. void start_connecting (); - // Internal function to return the reconnect backoff delay. - int get_reconnect_ivl (); + // Internal function to add a reconnect timer + void add_reconnect_timer(); + + // Internal function to return a reconnect backoff delay. + // Will modify the current_reconnect_ivl used for next call + // Returns the currently used interval + int get_new_reconnect_ivl (); // Actual connecting socket. tcp_connecter_t tcp_connecter; @@ -74,6 +79,9 @@ namespace zmq // Reference to the session we belong to. class session_t *session; + // Current reconnect ivl, updated for backoff strategy + int current_reconnect_ivl; + zmq_connecter_t (const zmq_connecter_t&); const zmq_connecter_t &operator = (const zmq_connecter_t&); }; -- cgit v1.2.3