summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Lucina <mato@kotelna.sk>2010-11-04 17:54:47 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-11-04 17:54:47 +0100
commit1b39bcd88336ebed002e13a8ac3d4bc104b13718 (patch)
tree065bafb75b1eabaeb7bb858d09a8fd77895916b8
parent756f7df8c8e82a67f033049af47a3f783ad951e1 (diff)
Automatically resize signalling socket buffer if full
If the socketpair used by signaler_t fills up, this can lead to deadlock. This patch provides partial resolution by attempting to resize SO_SNDBUF on the writer side, and if that fails we shall at least assert rather than hang. I've also refactored the signaler_t code to make the platform-dependent parts clearer and have tested both the MSG_DONTWAIT and standard POSIX path in recv. The Win32 implementation currently does not implement resizing as I'm not convinced that it's safe, but it will also assert like other platforms if signaler_t::send() cannot succeed. The OpenVMS implementation has been carried forward but is untested. Signed-off-by: Martin Lucina <mato@kotelna.sk>
-rw-r--r--src/config.hpp4
-rw-r--r--src/signaler.cpp403
-rw-r--r--src/signaler.hpp18
3 files changed, 201 insertions, 224 deletions
diff --git a/src/config.hpp b/src/config.hpp
index fb1b0d4..1db3bb6 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -35,10 +35,6 @@ namespace zmq
// memory allocation by approximately 99.6%
message_pipe_granularity = 256,
- // Socketpair send buffer size used by signaler. The default value of
- // zero means leave it at the system default.
- signaler_sndbuf_size = 0,
-
// Determines how often does socket poll for new commands when it
// still has unprocessed messages to handle. Thus, if it is set to 100,
// socket will process 100 inbound messages before doing the poll.
diff --git a/src/signaler.cpp b/src/signaler.cpp
index 08edb1e..a692078 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -23,15 +23,16 @@
#include "fd.hpp"
#include "ip.hpp"
-#if defined ZMQ_HAVE_OPENVMS
-#include <netinet/tcp.h>
-#include <unistd.h>
-#elif defined ZMQ_HAVE_WINDOWS
+#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <fcntl.h>
#include <limits.h>
+#include <netinet/tcp.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
#endif
zmq::fd_t zmq::signaler_t::get_fd ()
@@ -43,62 +44,18 @@ zmq::fd_t zmq::signaler_t::get_fd ()
zmq::signaler_t::signaler_t ()
{
- // Windows have no 'socketpair' function. CreatePipe is no good as pipe
- // handles cannot be polled on. Here we create the socketpair by hand.
-
- struct sockaddr_in addr;
- SOCKET listener;
- int addrlen = sizeof (addr);
-
- w = INVALID_SOCKET;
- r = INVALID_SOCKET;
-
- fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0));
- wsa_assert (rcs != INVALID_SOCKET);
-
- memset (&addr, 0, sizeof (addr));
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
- addr.sin_port = 0;
-
- int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
- wsa_assert (rc != SOCKET_ERROR);
-
- rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
- wsa_assert (rc != SOCKET_ERROR);
-
- // Listen for incomming connections.
- rc = listen (listener, 1);
- wsa_assert (rc != SOCKET_ERROR);
-
- // Create the socket.
- w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
- wsa_assert (w != INVALID_SOCKET);
-
- // Increase signaler SNDBUF if requested in config.hpp.
- if (signaler_sndbuf_size) {
- int sndbuf = signaler_sndbuf_size;
- socklen_t sndbuf_size = sizeof sndbuf;
- rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf,
- sndbuf_size);
- errno_assert (rc == 0);
- }
-
- // Connect to the remote peer.
- rc = connect (w, (sockaddr *) &addr, sizeof (addr));
- wsa_assert (rc != SOCKET_ERROR);
-
- // Accept connection from w.
- r = accept (listener, NULL, NULL);
- wsa_assert (r != INVALID_SOCKET);
+ // Create the socketpair for signalling.
+ int rc = make_socketpair (&r, &w);
+ errno_assert (rc == 0);
- // Set the read site of the pair to non-blocking mode.
+ // Set the writer to non-blocking mode.
unsigned long argp = 1;
- rc = ioctlsocket (r, FIONBIO, &argp);
+ rc = ioctlsocket (w, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
- // We don't need the listening socket anymore. Close it.
- rc = closesocket (listener);
+ // Set the reader to non-blocking mode.
+ argp = 1;
+ rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
}
@@ -113,80 +70,77 @@ zmq::signaler_t::~signaler_t ()
void zmq::signaler_t::send (const command_t &cmd_)
{
- // TODO: Note that send is a blocking operation.
- // How should we behave if the signal cannot be written to the signaler?
- // Even worse: What if half of a command is written?
- int rc = ::send (w, (char*) &cmd_, sizeof (command_t), 0);
- win_assert (rc != SOCKET_ERROR);
- zmq_assert (rc == sizeof (command_t));
+ // TODO: Implement SNDBUF auto-resizing as for POSIX platforms.
+ // In the mean time, the following code with assert if the send()
+ // call would block.
+ int nbytes = ::send (w, (char *)&cmd_, sizeof (command_t), 0);
+ wsa_assert (nbytes != SOCKET_ERROR);
+ zmq_assert (nbytes == sizeof (command_t));
}
int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{
if (block_) {
-
- // Switch to blocking mode.
+ // Set the reader to blocking mode.
unsigned long argp = 0;
int rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
}
-
- int err;
- int result;
- int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
+ // Attempt to read an entire command. Returns EAGAIN if non-blocking
+ // and a command is not available.
+ int err = 0;
+ int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0);
if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
+ // Save value of errno if we wish to pass it to caller.
err = EAGAIN;
- result = -1;
- }
- else {
- wsa_assert (nbytes != -1);
-
- // Check whether we haven't got half of a signal.
- zmq_assert (nbytes % sizeof (uint32_t) == 0);
-
- result = 0;
}
-
if (block_) {
-
- // Switch back to non-blocking mode.
+ // Re-set the reader to non-blocking mode.
unsigned long argp = 1;
int rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
}
-
- if (result == -1)
+ // If the recv failed, return with the saved errno if set.
+ if (err != 0) {
errno = err;
- return result;
-}
+ return -1;
+ }
+ // Sanity check for success.
+ wsa_assert (nbytes != SOCKET_ERROR);
-#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX
+ // Check whether we haven't got half of command.
+ zmq_assert (nbytes == sizeof (command_t));
+ return 0;
+}
-#include <sys/types.h>
-#include <sys/socket.h>
+#else // !ZMQ_HAVE_WINDOWS
zmq::signaler_t::signaler_t ()
{
- int sv [2];
- int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
+#ifdef PIPE_BUF
+ // Make sure that command can be written to the socket in atomic fashion.
+ // If this wasn't guaranteed, commands from different threads would be
+ // interleaved.
+ zmq_assert (sizeof (command_t) <= PIPE_BUF);
+#endif
+
+ // Create the socketpair for signalling.
+ int rc = make_socketpair (&r, &w);
+ errno_assert (rc == 0);
+
+ // Set the writer to non-blocking mode.
+ int flags = fcntl (w, F_GETFL, 0);
+ errno_assert (flags >= 0);
+ rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc == 0);
- w = sv [0];
- r = sv [1];
+#ifndef MSG_DONTWAIT
// Set the reader to non-blocking mode.
- int flags = fcntl (r, F_GETFL, 0);
- if (flags == -1)
- flags = 0;
+ flags = fcntl (r, F_GETFL, 0);
+ errno_assert (flags >= 0);
rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc != -1);
-
- // Increase signaler SNDBUF if requested in config.hpp.
- if (signaler_sndbuf_size) {
- int sndbuf = signaler_sndbuf_size;
- socklen_t sndbuf_size = sizeof sndbuf;
- rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size);
- errno_assert (rc == 0);
- }
+ errno_assert (rc == 0);
+#endif
}
zmq::signaler_t::~signaler_t ()
@@ -197,144 +151,172 @@ zmq::signaler_t::~signaler_t ()
void zmq::signaler_t::send (const command_t &cmd_)
{
+ // Attempt to write an entire command without blocking.
ssize_t nbytes;
do {
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
} while (nbytes == -1 && errno == EINTR);
+ // Attempt to increase signaler SNDBUF if the send failed.
+ if (nbytes == -1 && errno == EAGAIN) {
+ int old_sndbuf, new_sndbuf;
+ socklen_t sndbuf_size = sizeof old_sndbuf;
+ // Retrieve current send buffer size.
+ int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,
+ &sndbuf_size);
+ errno_assert (rc == 0);
+ new_sndbuf = old_sndbuf * 2;
+ // Double the new send buffer size.
+ rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);
+ errno_assert (rc == 0);
+ // Verify that the OS actually honored the request.
+ rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);
+ errno_assert (rc == 0);
+ zmq_assert (new_sndbuf > old_sndbuf);
+ // Retry the sending operation; at this point it must succeed.
+ do {
+ nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
+ } while (nbytes == -1 && errno == EINTR);
+ }
errno_assert (nbytes != -1);
+
+ // This should never happen as we've already checked that command size is
+ // less than PIPE_BUF.
zmq_assert (nbytes == sizeof (command_t));
}
int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{
+#ifdef MSG_DONTWAIT
+ // Attempt to read an entire command. Returns EAGAIN if non-blocking
+ // mode is requested and a command is not available.
+ ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
+ block_ ? 0 : MSG_DONTWAIT);
+ if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
+ return -1;
+#else
if (block_) {
-
// Set the reader to blocking mode.
int flags = fcntl (r, F_GETFL, 0);
- if (flags == -1)
- flags = 0;
+ errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
- errno_assert (rc != -1);
+ errno_assert (rc == 0);
}
-
- int err;
- int result;
- ssize_t nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
+ // Attempt to read an entire command. Returns EAGAIN if non-blocking
+ // and a command is not available.
+ int err = 0;
+ ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) {
+ // Save value of errno if we wish to pass it to caller.
err = errno;
- result = -1;
- }
- else {
- zmq_assert (nbytes != -1);
-
- // Check whether we haven't got half of command.
- zmq_assert (nbytes == sizeof (command_t));
-
- result = 0;
}
-
- if (block_) {
-
- // Set the reader to non-blocking mode.
+ if (block_) {
+ // Re-set the reader to non-blocking mode.
int flags = fcntl (r, F_GETFL, 0);
- if (flags == -1)
- flags = 0;
+ errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc != -1);
+ errno_assert (rc == 0);
}
-
- if (result == -1)
+ // If the recv failed, return with the saved errno if set.
+ if (err != 0) {
errno = err;
- return result;
-}
+ return -1;
+ }
+#endif
+ // Sanity check for success.
+ errno_assert (nbytes != -1);
-#else
+ // Check whether we haven't got half of command.
+ zmq_assert (nbytes == sizeof (command_t));
-#include <sys/types.h>
-#include <sys/socket.h>
+ return 0;
+}
-zmq::signaler_t::signaler_t ()
+#endif
+
+int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_)
{
- // Make sure that command can be written to the socket in atomic fashion.
- // If this wasn't guaranteed, commands from different threads would be
- // interleaved.
- zmq_assert (sizeof (command_t) <= PIPE_BUF);
+#if defined ZMQ_HAVE_WINDOWS
- int sv [2];
- int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
- errno_assert (rc == 0);
- w = sv [0];
- r = sv [1];
-
- // Increase signaler SNDBUF if requested in config.hpp.
- if (signaler_sndbuf_size) {
- int sndbuf = signaler_sndbuf_size;
- socklen_t sndbuf_size = sizeof sndbuf;
- rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size);
- errno_assert (rc == 0);
- }
-}
+ // Windows has no 'socketpair' function. CreatePipe is no good as pipe
+ // handles cannot be polled on. Here we create the socketpair by hand.
+ *w_ = INVALID_SOCKET;
+ *r_ = INVALID_SOCKET;
-zmq::signaler_t::~signaler_t ()
-{
- close (w);
- close (r);
-}
+ // Create listening socket.
+ SOCKET listener;
+ listener = socket (AF_INET, SOCK_STREAM, 0);
+ wsa_assert (listener != INVALID_SOCKET);
-void zmq::signaler_t::send (const command_t &cmd_)
-{
- // TODO: Note that send is a blocking operation.
- // How should we behave if the command cannot be written to the signaler?
- ssize_t nbytes;
- do {
- nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
- } while (nbytes == -1 && errno == EINTR);
- errno_assert (nbytes != -1);
+ // Set SO_REUSEADDR and TCP_NODELAY on listening socket.
+ BOOL so_reuseaddr = 1;
+ int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
+ (char *)&so_reuseaddr, sizeof (so_reuseaddr));
+ wsa_assert (rc != SOCKET_ERROR);
+ BOOL tcp_nodelay = 1;
+ rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
+ (char *)&tcp_nodelay, sizeof (tcp_nodelay));
+ wsa_assert (rc != SOCKET_ERROR);
- // This should never happen as we've already checked that command size is
- // less than PIPE_BUF.
- zmq_assert (nbytes == sizeof (command_t));
-}
+ // Bind listening socket to any free local port.
+ struct sockaddr_in addr;
+ memset (&addr, 0, sizeof (addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
+ addr.sin_port = 0;
+ rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
+ wsa_assert (rc != SOCKET_ERROR);
-int zmq::signaler_t::recv (command_t *cmd_, bool block_)
-{
- ssize_t nbytes;
- nbytes = ::recv (r, cmd_, sizeof (command_t), block_ ? 0 : MSG_DONTWAIT);
- if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
- return -1;
- errno_assert (nbytes != -1);
+ // Retrieve local port listener is bound to (into addr).
+ int addrlen = sizeof (addr);
+ rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
+ wsa_assert (rc != SOCKET_ERROR);
- // Check whether we haven't got half of command.
- zmq_assert (nbytes == sizeof (command_t));
+ // Listen for incomming connections.
+ rc = listen (listener, 1);
+ wsa_assert (rc != SOCKET_ERROR);
- return 0;
-}
+ // Create the writer socket.
+ *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
+ wsa_assert (*w_ != INVALID_SOCKET);
-#endif
+ // Set TCP_NODELAY on writer socket.
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
+ (char *)&tcp_nodelay, sizeof (tcp_nodelay));
+ wsa_assert (rc != SOCKET_ERROR);
-#if defined ZMQ_HAVE_OPENVMS
+ // Connect writer to the listener.
+ rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
+ wsa_assert (rc != SOCKET_ERROR);
-int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_,
- int sv_ [2])
-{
- int listener;
- sockaddr_in lcladdr;
- socklen_t lcladdr_len;
- int rc;
- int on = 1;
+ // Accept connection from writer.
+ *r_ = accept (listener, NULL, NULL);
+ wsa_assert (*r_ != INVALID_SOCKET);
+
+ // We don't need the listening socket anymore. Close it.
+ rc = closesocket (listener);
+ wsa_assert (rc != SOCKET_ERROR);
+
+ return 0;
- zmq_assert (type_ == SOCK_STREAM);
+#elif defined ZMQ_HAVE_OPENVMS
- // Fill in the localhost address (127.0.0.1).
+ // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
+ // it does not set the socket options TCP_NODELAY and TCP_NODELACK which
+ // can lead to performance problems.
+ //
+ // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
+ // create the socket pair manually.
+ sockaddr_in lcladdr;
memset (&lcladdr, 0, sizeof (lcladdr));
lcladdr.sin_family = AF_INET;
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
lcladdr.sin_port = 0;
- listener = socket (AF_INET, SOCK_STREAM, 0);
+ int listener = socket (AF_INET, SOCK_STREAM, 0);
errno_assert (listener != -1);
- rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
+ int on = 1;
+ int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
errno_assert (rc != -1);
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
@@ -342,8 +324,8 @@ int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_,
rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
errno_assert (rc != -1);
-
- lcladdr_len = sizeof (lcladdr);
+
+ socklen_t lcladdr_len = sizeof (lcladdr);
rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
errno_assert (rc != -1);
@@ -351,25 +333,34 @@ int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_,
rc = listen (listener, 1);
errno_assert (rc != -1);
- sv_ [0] = socket (AF_INET, SOCK_STREAM, 0);
- errno_assert (rc != -1);
+ *w_ = socket (AF_INET, SOCK_STREAM, 0);
+ errno_assert (*w_ != -1);
- rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
errno_assert (rc != -1);
- rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
errno_assert (rc != -1);
- rc = connect (sv_ [0], (struct sockaddr*) &lcladdr, sizeof (lcladdr));
+ rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
errno_assert (rc != -1);
- sv_ [1] = accept (listener, NULL, NULL);
- errno_assert (sv_ [1] != -1);
+ *r_ = accept (listener, NULL, NULL);
+ errno_assert (*r_ != -1);
close (listener);
return 0;
-}
+
+#else // All other implementations support socketpair()
+
+ int sv [2];
+ int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
+ errno_assert (rc == 0);
+ *w_ = sv [0];
+ *r_ = sv [1];
+ return 0;
#endif
+}
diff --git a/src/signaler.hpp b/src/signaler.hpp
index 3e5ff13..faf3f1f 100644
--- a/src/signaler.hpp
+++ b/src/signaler.hpp
@@ -44,24 +44,14 @@ namespace zmq
private:
-#if defined ZMQ_HAVE_OPENVMS
-
- // Whilst OpenVMS supports socketpair - it maps to AF_INET only.
- // Further, it does not set the socket options TCP_NODELAY and
- // TCP_NODELACK which can lead to performance problems. We'll
- // overload the socketpair function for this class.
- //
- // The bug will be fixed in V5.6 ECO4 and beyond. In the
- // meantime, we'll create the socket pair manually.
- static int socketpair (int domain_, int type_, int protocol_,
- int sv_ [2]);
-#endif
-
// Write & read end of the socketpair.
fd_t w;
fd_t r;
- // Disable copying of fd_signeler object.
+ // Platform-dependent function to create a socketpair.
+ static int make_socketpair (fd_t *r_, fd_t *w_);
+
+ // Disable copying of signaler_t object.
signaler_t (const signaler_t&);
void operator = (const signaler_t&);
};