summaryrefslogtreecommitdiff
path: root/src/zmq_connecter.cpp
diff options
context:
space:
mode:
authorMartin Lucina <mato@kotelna.sk>2010-10-08 17:49:40 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-10-08 17:49:40 +0200
commitf90c8d957ea2a11c73a307aae2494a26d22c2f3b (patch)
tree8ead216f7f512394da0e50048bb03f7ed7a3c104 /src/zmq_connecter.cpp
parent7a685b0f88386b11c4c1fcbb45324aa28f4e2eac (diff)
Scalability improvements for large amounts of connections
Add signaler_sndbuf_size option to config.hpp which allows the user to increase the size of the send buffer used by the signalling socketpair. Implement random backoff for reconnection attempts using a primitive pseudo-random generation to prevent reconnection storms. Add wait_before_connect option to config.hpp to allow the user to enable random delay even on initial connect. Default is false for low latency. Signed-off-by: Martin Lucina <mato@kotelna.sk>
Diffstat (limited to 'src/zmq_connecter.cpp')
-rw-r--r--src/zmq_connecter.cpp25
1 files changed, 21 insertions, 4 deletions
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 1e83529..a36c671 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -19,6 +19,13 @@
#include <new>
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include <sys/types.h>
+#include <unistd.h>
+#endif
+
#include "zmq_connecter.hpp"
#include "zmq_engine.hpp"
#include "zmq_init.hpp"
@@ -31,7 +38,7 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
own_t (io_thread_),
io_object_t (io_thread_),
handle_valid (false),
- wait (false),
+ wait (wait_before_connect),
session (session_),
options (options_)
{
@@ -47,10 +54,20 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
rm_fd (handle);
}
+int zmq::zmq_connecter_t::get_reconnect_period ()
+{
+#if defined ZMQ_HAVE_WINDOWS
+ return (reconnect_period + (((int)GetCurrentProcessId () * 13)
+ % reconnect_period));
+#else
+ return (reconnect_period + (((int)getpid () * 13) % reconnect_period));
+#endif
+}
+
void zmq::zmq_connecter_t::process_plug ()
{
if (wait)
- add_timer (reconnect_period, reconnect_timer_id);
+ add_timer (get_reconnect_period (), reconnect_timer_id);
else
start_connecting ();
}
@@ -73,7 +90,7 @@ void zmq::zmq_connecter_t::out_event ()
if (fd == retired_fd) {
tcp_connecter.close ();
wait = true;
- add_timer (reconnect_period, reconnect_timer_id);
+ add_timer (get_reconnect_period (), reconnect_timer_id);
return;
}
@@ -122,5 +139,5 @@ void zmq::zmq_connecter_t::start_connecting ()
// Handle any other error condition by eventual reconnect.
wait = true;
- add_timer (reconnect_period, reconnect_timer_id);
+ add_timer (get_reconnect_period (), reconnect_timer_id);
}