diff options
author | Martin Lucina <mato@kotelna.sk> | 2010-10-08 17:49:40 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-10-08 17:49:40 +0200 |
commit | f90c8d957ea2a11c73a307aae2494a26d22c2f3b (patch) | |
tree | 8ead216f7f512394da0e50048bb03f7ed7a3c104 /src/zmq_connecter.cpp | |
parent | 7a685b0f88386b11c4c1fcbb45324aa28f4e2eac (diff) |
Scalability improvements for large amounts of connections
Add signaler_sndbuf_size option to config.hpp which allows the user to
increase the size of the send buffer used by the signalling socketpair.
Implement random backoff for reconnection attempts using a primitive
pseudo-random generation to prevent reconnection storms.
Add wait_before_connect option to config.hpp to allow the user to enable
random delay even on initial connect. Default is false for low latency.
Signed-off-by: Martin Lucina <mato@kotelna.sk>
Diffstat (limited to 'src/zmq_connecter.cpp')
-rw-r--r-- | src/zmq_connecter.cpp | 25 |
1 files changed, 21 insertions, 4 deletions
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 1e83529..a36c671 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -19,6 +19,13 @@ #include <new> +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <sys/types.h> +#include <unistd.h> +#endif + #include "zmq_connecter.hpp" #include "zmq_engine.hpp" #include "zmq_init.hpp" @@ -31,7 +38,7 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_, own_t (io_thread_), io_object_t (io_thread_), handle_valid (false), - wait (false), + wait (wait_before_connect), session (session_), options (options_) { @@ -47,10 +54,20 @@ zmq::zmq_connecter_t::~zmq_connecter_t () rm_fd (handle); } +int zmq::zmq_connecter_t::get_reconnect_period () +{ +#if defined ZMQ_HAVE_WINDOWS + return (reconnect_period + (((int)GetCurrentProcessId () * 13) + % reconnect_period)); +#else + return (reconnect_period + (((int)getpid () * 13) % reconnect_period)); +#endif +} + void zmq::zmq_connecter_t::process_plug () { if (wait) - add_timer (reconnect_period, reconnect_timer_id); + add_timer (get_reconnect_period (), reconnect_timer_id); else start_connecting (); } @@ -73,7 +90,7 @@ void zmq::zmq_connecter_t::out_event () if (fd == retired_fd) { tcp_connecter.close (); wait = true; - add_timer (reconnect_period, reconnect_timer_id); + add_timer (get_reconnect_period (), reconnect_timer_id); return; } @@ -122,5 +139,5 @@ void zmq::zmq_connecter_t::start_connecting () // Handle any other error condition by eventual reconnect. wait = true; - add_timer (reconnect_period, reconnect_timer_id); + add_timer (get_reconnect_period (), reconnect_timer_id); } |