diff options
-rw-r--r-- | src/config.hpp | 9 | ||||
-rw-r--r-- | src/signaler.cpp | 39 | ||||
-rw-r--r-- | src/zmq_connecter.cpp | 25 | ||||
-rw-r--r-- | src/zmq_connecter.hpp | 3 |
4 files changed, 64 insertions, 12 deletions
diff --git a/src/config.hpp b/src/config.hpp index 57c2373..0d24d40 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -39,6 +39,10 @@ namespace zmq // using a single system call. signal_buffer_size = 8, + // Socketpair send buffer size used by signaler. The default value of + // zero means leave it at the system default. + signaler_sndbuf_size = 0, + // Determines how often does socket poll for new commands when it // still has unprocessed messages to handle. Thus, if it is set to 100, // socket will process 100 inbound messages before doing the poll. @@ -72,6 +76,9 @@ namespace zmq // How long to wait (milliseconds) till reattempting to connect. reconnect_period = 100, + // Should initial connection attempts be delayed? + wait_before_connect = false, + // Maximal delay to process command in API thread (in CPU ticks). // 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs. // Note that delay is only applied when there is continuous stream of @@ -87,7 +94,7 @@ namespace zmq // Maximal number of non-accepted connections that can be held by // TCP listener object. - tcp_connection_backlog = 10, + tcp_connection_backlog = 100, // Maximum transport data unit size for PGM (TPDU). pgm_max_tpdu = 1500 diff --git a/src/signaler.cpp b/src/signaler.cpp index c14a709..787ce4a 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -49,10 +49,10 @@ zmq::signaler_t::signaler_t () struct sockaddr_in addr; SOCKET listener; int addrlen = sizeof (addr); - + w = INVALID_SOCKET; r = INVALID_SOCKET; - + fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0)); wsa_assert (rcs != INVALID_SOCKET); @@ -60,25 +60,34 @@ zmq::signaler_t::signaler_t () addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); addr.sin_port = 0; - + int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); wsa_assert (rc != SOCKET_ERROR); rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); wsa_assert (rc != SOCKET_ERROR); - + // Listen for incomming connections. rc = listen (listener, 1); wsa_assert (rc != SOCKET_ERROR); - + // Create the socket. w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); wsa_assert (w != INVALID_SOCKET); - + + // Increase signaler SNDBUF if requested in config.hpp. + if (signaler_sndbuf_size) { + int sndbuf = signaler_sndbuf_size; + socklen_t sndbuf_size = sizeof sndbuf; + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf, + sndbuf_size); + errno_assert (rc == 0); + } + // Connect to the remote peer. rc = connect (w, (sockaddr *) &addr, sizeof (addr)); wsa_assert (rc != SOCKET_ERROR); - + // Accept connection from w. r = accept (listener, NULL, NULL); wsa_assert (r != INVALID_SOCKET); @@ -170,6 +179,14 @@ zmq::signaler_t::signaler_t () flags = 0; rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); errno_assert (rc != -1); + + // Increase signaler SNDBUF if requested in config.hpp. + if (signaler_sndbuf_size) { + int sndbuf = signaler_sndbuf_size; + socklen_t sndbuf_size = sizeof sndbuf; + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size); + errno_assert (rc == 0); + } } zmq::signaler_t::~signaler_t () @@ -248,6 +265,14 @@ zmq::signaler_t::signaler_t () errno_assert (rc == 0); w = sv [0]; r = sv [1]; + + // Increase signaler SNDBUF if requested in config.hpp. + if (signaler_sndbuf_size) { + int sndbuf = signaler_sndbuf_size; + socklen_t sndbuf_size = sizeof sndbuf; + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size); + errno_assert (rc == 0); + } } zmq::signaler_t::~signaler_t () diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 1e83529..a36c671 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -19,6 +19,13 @@ #include <new> +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <sys/types.h> +#include <unistd.h> +#endif + #include "zmq_connecter.hpp" #include "zmq_engine.hpp" #include "zmq_init.hpp" @@ -31,7 +38,7 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_, own_t (io_thread_), io_object_t (io_thread_), handle_valid (false), - wait (false), + wait (wait_before_connect), session (session_), options (options_) { @@ -47,10 +54,20 @@ zmq::zmq_connecter_t::~zmq_connecter_t () rm_fd (handle); } +int zmq::zmq_connecter_t::get_reconnect_period () +{ +#if defined ZMQ_HAVE_WINDOWS + return (reconnect_period + (((int)GetCurrentProcessId () * 13) + % reconnect_period)); +#else + return (reconnect_period + (((int)getpid () * 13) % reconnect_period)); +#endif +} + void zmq::zmq_connecter_t::process_plug () { if (wait) - add_timer (reconnect_period, reconnect_timer_id); + add_timer (get_reconnect_period (), reconnect_timer_id); else start_connecting (); } @@ -73,7 +90,7 @@ void zmq::zmq_connecter_t::out_event () if (fd == retired_fd) { tcp_connecter.close (); wait = true; - add_timer (reconnect_period, reconnect_timer_id); + add_timer (get_reconnect_period (), reconnect_timer_id); return; } @@ -122,5 +139,5 @@ void zmq::zmq_connecter_t::start_connecting () // Handle any other error condition by eventual reconnect. wait = true; - add_timer (reconnect_period, reconnect_timer_id); + add_timer (get_reconnect_period (), reconnect_timer_id); } diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index 16bfc31..7a516f0 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -56,6 +56,9 @@ namespace zmq // Internal function to start the actual connection establishment. void start_connecting (); + // Internal function to return the reconnect backoff delay. + int get_reconnect_period (); + // Actual connecting socket. tcp_connecter_t tcp_connecter; |