summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThijs Terlouw <thijsterlouw@gmail.com>2011-01-26 07:01:06 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-01-26 07:01:06 +0100
commitf7f1dfc86dd649edbd789a5d157d74721338c912 (patch)
tree4189c915385050b759eb2a2890adcba25885a476 /src
parent8e61a11b398c95d829f24c388737eb122405c97b (diff)
ZMQ_RECONNECT_IVL_MAX socket option added
It allows for exponential back-off strategy when reconnecting. Signed-off-by: Thijs Terlouw <thijsterlouw@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/options.cpp22
-rw-r--r--src/options.hpp6
-rw-r--r--src/zmq_connecter.cpp55
-rw-r--r--src/zmq_connecter.hpp12
4 files changed, 75 insertions, 20 deletions
diff --git a/src/options.cpp b/src/options.cpp
index ae35059..c6d5760 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -37,6 +37,7 @@ zmq::options_t::options_t () :
type (-1),
linger (-1),
reconnect_ivl (100),
+ reconnect_ivl_max (0),
backlog (100),
requires_in (false),
requires_out (false),
@@ -161,6 +162,18 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
reconnect_ivl = *((int*) optval_);
return 0;
+ case ZMQ_RECONNECT_IVL_MAX:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ if (*((int*) optval_) < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ reconnect_ivl_max = *((int*) optval_);
+ return 0;
+
case ZMQ_BACKLOG:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
@@ -297,6 +310,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
+ case ZMQ_RECONNECT_IVL_MAX:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = reconnect_ivl_max;
+ *optvallen_ = sizeof (int);
+ return 0;
+
case ZMQ_BACKLOG:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index e6df505..38c6982 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -59,8 +59,12 @@ namespace zmq
// Linger time, in milliseconds.
int linger;
- // Interval between attempts to reconnect, in milliseconds.
+ // Minimum interval between attempts to reconnect, in milliseconds.
+ // Default 100ms
int reconnect_ivl;
+ // Maximum interval between attempts to reconnect, in milliseconds.
+ // Default 0 (unused)
+ int reconnect_ivl_max;
// Maximum backlog for pending connections.
int backlog;
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 8a6d89a..d2638ac 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -40,10 +40,11 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
io_object_t (io_thread_),
handle_valid (false),
wait (wait_before_connect),
- session (session_)
+ session (session_),
+ current_reconnect_ivl(options.reconnect_ivl)
{
- int rc = tcp_connecter.set_address (protocol_, address_);
- zmq_assert (rc == 0);
+ int rc = tcp_connecter.set_address (protocol_, address_);
+ zmq_assert (rc == 0);
}
zmq::zmq_connecter_t::~zmq_connecter_t ()
@@ -54,21 +55,10 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
rm_fd (handle);
}
-int zmq::zmq_connecter_t::get_reconnect_ivl ()
-{
-#if defined ZMQ_HAVE_WINDOWS
- return (options.reconnect_ivl + (((int) GetCurrentProcessId () * 13)
- % options.reconnect_ivl));
-#else
- return (options.reconnect_ivl + (((int) getpid () * 13)
- % options.reconnect_ivl));
-#endif
-}
-
void zmq::zmq_connecter_t::process_plug ()
{
if (wait)
- add_timer (get_reconnect_ivl (), reconnect_timer_id);
+ add_reconnect_timer();
else
start_connecting ();
}
@@ -91,7 +81,7 @@ void zmq::zmq_connecter_t::out_event ()
if (fd == retired_fd) {
tcp_connecter.close ();
wait = true;
- add_timer (get_reconnect_ivl (), reconnect_timer_id);
+ add_reconnect_timer();
return;
}
@@ -140,5 +130,36 @@ void zmq::zmq_connecter_t::start_connecting ()
// Handle any other error condition by eventual reconnect.
wait = true;
- add_timer (get_reconnect_ivl (), reconnect_timer_id);
+ add_reconnect_timer();
+}
+
+void zmq::zmq_connecter_t::add_reconnect_timer()
+{
+ add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
+}
+
+int zmq::zmq_connecter_t::get_new_reconnect_ivl ()
+{
+#if defined ZMQ_HAVE_WINDOWS
+ int pid = (int) GetCurrentProcessId ();
+#else
+ int pid = (int) getpid ();
+#endif
+
+ // The new interval is the current interval + random value.
+ int this_interval = current_reconnect_ivl +
+ ((pid * 13) % options.reconnect_ivl);
+
+ // Only change the current reconnect interval if the maximum reconnect
+ // interval was set and if it's larger than the reconnect interval.
+ if (options.reconnect_ivl_max > 0 &&
+ options.reconnect_ivl_max > options.reconnect_ivl) {
+
+ // Calculate the next interval
+ current_reconnect_ivl = current_reconnect_ivl * 2;
+ if(current_reconnect_ivl >= options.reconnect_ivl_max) {
+ current_reconnect_ivl = options.reconnect_ivl_max;
+ }
+ }
+ return this_interval;
}
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index 060f5c9..ef2cd1a 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -55,8 +55,13 @@ namespace zmq
// Internal function to start the actual connection establishment.
void start_connecting ();
- // Internal function to return the reconnect backoff delay.
- int get_reconnect_ivl ();
+ // Internal function to add a reconnect timer
+ void add_reconnect_timer();
+
+ // Internal function to return a reconnect backoff delay.
+ // Will modify the current_reconnect_ivl used for next call
+ // Returns the currently used interval
+ int get_new_reconnect_ivl ();
// Actual connecting socket.
tcp_connecter_t tcp_connecter;
@@ -74,6 +79,9 @@ namespace zmq
// Reference to the session we belong to.
class session_t *session;
+ // Current reconnect ivl, updated for backoff strategy
+ int current_reconnect_ivl;
+
zmq_connecter_t (const zmq_connecter_t&);
const zmq_connecter_t &operator = (const zmq_connecter_t&);
};