diff options
author | Martin Lucina <mato@kotelna.sk> | 2010-10-08 17:49:40 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-10-08 17:49:40 +0200 |
commit | f90c8d957ea2a11c73a307aae2494a26d22c2f3b (patch) | |
tree | 8ead216f7f512394da0e50048bb03f7ed7a3c104 /src/signaler.cpp | |
parent | 7a685b0f88386b11c4c1fcbb45324aa28f4e2eac (diff) |
Scalability improvements for large amounts of connections
Add signaler_sndbuf_size option to config.hpp which allows the user to
increase the size of the send buffer used by the signalling socketpair.
Implement random backoff for reconnection attempts using a primitive
pseudo-random generation to prevent reconnection storms.
Add wait_before_connect option to config.hpp to allow the user to enable
random delay even on initial connect. Default is false for low latency.
Signed-off-by: Martin Lucina <mato@kotelna.sk>
Diffstat (limited to 'src/signaler.cpp')
-rw-r--r-- | src/signaler.cpp | 39 |
1 files changed, 32 insertions, 7 deletions
diff --git a/src/signaler.cpp b/src/signaler.cpp index c14a709..787ce4a 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -49,10 +49,10 @@ zmq::signaler_t::signaler_t () struct sockaddr_in addr; SOCKET listener; int addrlen = sizeof (addr); - + w = INVALID_SOCKET; r = INVALID_SOCKET; - + fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0)); wsa_assert (rcs != INVALID_SOCKET); @@ -60,25 +60,34 @@ zmq::signaler_t::signaler_t () addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); addr.sin_port = 0; - + int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); wsa_assert (rc != SOCKET_ERROR); rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); wsa_assert (rc != SOCKET_ERROR); - + // Listen for incomming connections. rc = listen (listener, 1); wsa_assert (rc != SOCKET_ERROR); - + // Create the socket. w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); wsa_assert (w != INVALID_SOCKET); - + + // Increase signaler SNDBUF if requested in config.hpp. + if (signaler_sndbuf_size) { + int sndbuf = signaler_sndbuf_size; + socklen_t sndbuf_size = sizeof sndbuf; + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf, + sndbuf_size); + errno_assert (rc == 0); + } + // Connect to the remote peer. rc = connect (w, (sockaddr *) &addr, sizeof (addr)); wsa_assert (rc != SOCKET_ERROR); - + // Accept connection from w. r = accept (listener, NULL, NULL); wsa_assert (r != INVALID_SOCKET); @@ -170,6 +179,14 @@ zmq::signaler_t::signaler_t () flags = 0; rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); errno_assert (rc != -1); + + // Increase signaler SNDBUF if requested in config.hpp. + if (signaler_sndbuf_size) { + int sndbuf = signaler_sndbuf_size; + socklen_t sndbuf_size = sizeof sndbuf; + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size); + errno_assert (rc == 0); + } } zmq::signaler_t::~signaler_t () @@ -248,6 +265,14 @@ zmq::signaler_t::signaler_t () errno_assert (rc == 0); w = sv [0]; r = sv [1]; + + // Increase signaler SNDBUF if requested in config.hpp. + if (signaler_sndbuf_size) { + int sndbuf = signaler_sndbuf_size; + socklen_t sndbuf_size = sizeof sndbuf; + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size); + errno_assert (rc == 0); + } } zmq::signaler_t::~signaler_t () |