diff options
-rw-r--r-- | src/config.hpp | 4 | ||||
-rw-r--r-- | src/signaler.cpp | 403 | ||||
-rw-r--r-- | src/signaler.hpp | 18 |
3 files changed, 201 insertions, 224 deletions
diff --git a/src/config.hpp b/src/config.hpp index fb1b0d4..1db3bb6 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -35,10 +35,6 @@ namespace zmq // memory allocation by approximately 99.6% message_pipe_granularity = 256, - // Socketpair send buffer size used by signaler. The default value of - // zero means leave it at the system default. - signaler_sndbuf_size = 0, - // Determines how often does socket poll for new commands when it // still has unprocessed messages to handle. Thus, if it is set to 100, // socket will process 100 inbound messages before doing the poll. diff --git a/src/signaler.cpp b/src/signaler.cpp index 08edb1e..a692078 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -23,15 +23,16 @@ #include "fd.hpp" #include "ip.hpp" -#if defined ZMQ_HAVE_OPENVMS -#include <netinet/tcp.h> -#include <unistd.h> -#elif defined ZMQ_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include <unistd.h> #include <fcntl.h> #include <limits.h> +#include <netinet/tcp.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> #endif zmq::fd_t zmq::signaler_t::get_fd () @@ -43,62 +44,18 @@ zmq::fd_t zmq::signaler_t::get_fd () zmq::signaler_t::signaler_t () { - // Windows have no 'socketpair' function. CreatePipe is no good as pipe - // handles cannot be polled on. Here we create the socketpair by hand. - - struct sockaddr_in addr; - SOCKET listener; - int addrlen = sizeof (addr); - - w = INVALID_SOCKET; - r = INVALID_SOCKET; - - fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0)); - wsa_assert (rcs != INVALID_SOCKET); - - memset (&addr, 0, sizeof (addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - addr.sin_port = 0; - - int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); - wsa_assert (rc != SOCKET_ERROR); - - rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); - wsa_assert (rc != SOCKET_ERROR); - - // Listen for incomming connections. - rc = listen (listener, 1); - wsa_assert (rc != SOCKET_ERROR); - - // Create the socket. - w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); - wsa_assert (w != INVALID_SOCKET); - - // Increase signaler SNDBUF if requested in config.hpp. - if (signaler_sndbuf_size) { - int sndbuf = signaler_sndbuf_size; - socklen_t sndbuf_size = sizeof sndbuf; - rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf, - sndbuf_size); - errno_assert (rc == 0); - } - - // Connect to the remote peer. - rc = connect (w, (sockaddr *) &addr, sizeof (addr)); - wsa_assert (rc != SOCKET_ERROR); - - // Accept connection from w. - r = accept (listener, NULL, NULL); - wsa_assert (r != INVALID_SOCKET); + // Create the socketpair for signalling. + int rc = make_socketpair (&r, &w); + errno_assert (rc == 0); - // Set the read site of the pair to non-blocking mode. + // Set the writer to non-blocking mode. unsigned long argp = 1; - rc = ioctlsocket (r, FIONBIO, &argp); + rc = ioctlsocket (w, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); - // We don't need the listening socket anymore. Close it. - rc = closesocket (listener); + // Set the reader to non-blocking mode. + argp = 1; + rc = ioctlsocket (r, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); } @@ -113,80 +70,77 @@ zmq::signaler_t::~signaler_t () void zmq::signaler_t::send (const command_t &cmd_) { - // TODO: Note that send is a blocking operation. - // How should we behave if the signal cannot be written to the signaler? - // Even worse: What if half of a command is written? - int rc = ::send (w, (char*) &cmd_, sizeof (command_t), 0); - win_assert (rc != SOCKET_ERROR); - zmq_assert (rc == sizeof (command_t)); + // TODO: Implement SNDBUF auto-resizing as for POSIX platforms. + // In the mean time, the following code with assert if the send() + // call would block. + int nbytes = ::send (w, (char *)&cmd_, sizeof (command_t), 0); + wsa_assert (nbytes != SOCKET_ERROR); + zmq_assert (nbytes == sizeof (command_t)); } int zmq::signaler_t::recv (command_t *cmd_, bool block_) { if (block_) { - - // Switch to blocking mode. + // Set the reader to blocking mode. unsigned long argp = 0; int rc = ioctlsocket (r, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); } - - int err; - int result; - int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); + // Attempt to read an entire command. Returns EAGAIN if non-blocking + // and a command is not available. + int err = 0; + int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0); if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { + // Save value of errno if we wish to pass it to caller. err = EAGAIN; - result = -1; - } - else { - wsa_assert (nbytes != -1); - - // Check whether we haven't got half of a signal. - zmq_assert (nbytes % sizeof (uint32_t) == 0); - - result = 0; } - if (block_) { - - // Switch back to non-blocking mode. + // Re-set the reader to non-blocking mode. unsigned long argp = 1; int rc = ioctlsocket (r, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); } - - if (result == -1) + // If the recv failed, return with the saved errno if set. + if (err != 0) { errno = err; - return result; -} + return -1; + } + // Sanity check for success. + wsa_assert (nbytes != SOCKET_ERROR); -#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX + // Check whether we haven't got half of command. + zmq_assert (nbytes == sizeof (command_t)); + return 0; +} -#include <sys/types.h> -#include <sys/socket.h> +#else // !ZMQ_HAVE_WINDOWS zmq::signaler_t::signaler_t () { - int sv [2]; - int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); +#ifdef PIPE_BUF + // Make sure that command can be written to the socket in atomic fashion. + // If this wasn't guaranteed, commands from different threads would be + // interleaved. + zmq_assert (sizeof (command_t) <= PIPE_BUF); +#endif + + // Create the socketpair for signalling. + int rc = make_socketpair (&r, &w); + errno_assert (rc == 0); + + // Set the writer to non-blocking mode. + int flags = fcntl (w, F_GETFL, 0); + errno_assert (flags >= 0); + rc = fcntl (w, F_SETFL, flags | O_NONBLOCK); errno_assert (rc == 0); - w = sv [0]; - r = sv [1]; +#ifndef MSG_DONTWAIT // Set the reader to non-blocking mode. - int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; + flags = fcntl (r, F_GETFL, 0); + errno_assert (flags >= 0); rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); - - // Increase signaler SNDBUF if requested in config.hpp. - if (signaler_sndbuf_size) { - int sndbuf = signaler_sndbuf_size; - socklen_t sndbuf_size = sizeof sndbuf; - rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size); - errno_assert (rc == 0); - } + errno_assert (rc == 0); +#endif } zmq::signaler_t::~signaler_t () @@ -197,144 +151,172 @@ zmq::signaler_t::~signaler_t () void zmq::signaler_t::send (const command_t &cmd_) { + // Attempt to write an entire command without blocking. ssize_t nbytes; do { nbytes = ::send (w, &cmd_, sizeof (command_t), 0); } while (nbytes == -1 && errno == EINTR); + // Attempt to increase signaler SNDBUF if the send failed. + if (nbytes == -1 && errno == EAGAIN) { + int old_sndbuf, new_sndbuf; + socklen_t sndbuf_size = sizeof old_sndbuf; + // Retrieve current send buffer size. + int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf, + &sndbuf_size); + errno_assert (rc == 0); + new_sndbuf = old_sndbuf * 2; + // Double the new send buffer size. + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size); + errno_assert (rc == 0); + // Verify that the OS actually honored the request. + rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size); + errno_assert (rc == 0); + zmq_assert (new_sndbuf > old_sndbuf); + // Retry the sending operation; at this point it must succeed. + do { + nbytes = ::send (w, &cmd_, sizeof (command_t), 0); + } while (nbytes == -1 && errno == EINTR); + } errno_assert (nbytes != -1); + + // This should never happen as we've already checked that command size is + // less than PIPE_BUF. zmq_assert (nbytes == sizeof (command_t)); } int zmq::signaler_t::recv (command_t *cmd_, bool block_) { +#ifdef MSG_DONTWAIT + // Attempt to read an entire command. Returns EAGAIN if non-blocking + // mode is requested and a command is not available. + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), + block_ ? 0 : MSG_DONTWAIT); + if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) + return -1; +#else if (block_) { - // Set the reader to blocking mode. int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; + errno_assert (flags >= 0); int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); - errno_assert (rc != -1); + errno_assert (rc == 0); } - - int err; - int result; - ssize_t nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); + // Attempt to read an entire command. Returns EAGAIN if non-blocking + // and a command is not available. + int err = 0; + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) { + // Save value of errno if we wish to pass it to caller. err = errno; - result = -1; - } - else { - zmq_assert (nbytes != -1); - - // Check whether we haven't got half of command. - zmq_assert (nbytes == sizeof (command_t)); - - result = 0; } - - if (block_) { - - // Set the reader to non-blocking mode. + if (block_) { + // Re-set the reader to non-blocking mode. int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; + errno_assert (flags >= 0); int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); + errno_assert (rc == 0); } - - if (result == -1) + // If the recv failed, return with the saved errno if set. + if (err != 0) { errno = err; - return result; -} + return -1; + } +#endif + // Sanity check for success. + errno_assert (nbytes != -1); -#else + // Check whether we haven't got half of command. + zmq_assert (nbytes == sizeof (command_t)); -#include <sys/types.h> -#include <sys/socket.h> + return 0; +} -zmq::signaler_t::signaler_t () +#endif + +int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_) { - // Make sure that command can be written to the socket in atomic fashion. - // If this wasn't guaranteed, commands from different threads would be - // interleaved. - zmq_assert (sizeof (command_t) <= PIPE_BUF); +#if defined ZMQ_HAVE_WINDOWS - int sv [2]; - int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); - errno_assert (rc == 0); - w = sv [0]; - r = sv [1]; - - // Increase signaler SNDBUF if requested in config.hpp. - if (signaler_sndbuf_size) { - int sndbuf = signaler_sndbuf_size; - socklen_t sndbuf_size = sizeof sndbuf; - rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size); - errno_assert (rc == 0); - } -} + // Windows has no 'socketpair' function. CreatePipe is no good as pipe + // handles cannot be polled on. Here we create the socketpair by hand. + *w_ = INVALID_SOCKET; + *r_ = INVALID_SOCKET; -zmq::signaler_t::~signaler_t () -{ - close (w); - close (r); -} + // Create listening socket. + SOCKET listener; + listener = socket (AF_INET, SOCK_STREAM, 0); + wsa_assert (listener != INVALID_SOCKET); -void zmq::signaler_t::send (const command_t &cmd_) -{ - // TODO: Note that send is a blocking operation. - // How should we behave if the command cannot be written to the signaler? - ssize_t nbytes; - do { - nbytes = ::send (w, &cmd_, sizeof (command_t), 0); - } while (nbytes == -1 && errno == EINTR); - errno_assert (nbytes != -1); + // Set SO_REUSEADDR and TCP_NODELAY on listening socket. + BOOL so_reuseaddr = 1; + int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, + (char *)&so_reuseaddr, sizeof (so_reuseaddr)); + wsa_assert (rc != SOCKET_ERROR); + BOOL tcp_nodelay = 1; + rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, + (char *)&tcp_nodelay, sizeof (tcp_nodelay)); + wsa_assert (rc != SOCKET_ERROR); - // This should never happen as we've already checked that command size is - // less than PIPE_BUF. - zmq_assert (nbytes == sizeof (command_t)); -} + // Bind listening socket to any free local port. + struct sockaddr_in addr; + memset (&addr, 0, sizeof (addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + addr.sin_port = 0; + rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); + wsa_assert (rc != SOCKET_ERROR); -int zmq::signaler_t::recv (command_t *cmd_, bool block_) -{ - ssize_t nbytes; - nbytes = ::recv (r, cmd_, sizeof (command_t), block_ ? 0 : MSG_DONTWAIT); - if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) - return -1; - errno_assert (nbytes != -1); + // Retrieve local port listener is bound to (into addr). + int addrlen = sizeof (addr); + rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); + wsa_assert (rc != SOCKET_ERROR); - // Check whether we haven't got half of command. - zmq_assert (nbytes == sizeof (command_t)); + // Listen for incomming connections. + rc = listen (listener, 1); + wsa_assert (rc != SOCKET_ERROR); - return 0; -} + // Create the writer socket. + *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); + wsa_assert (*w_ != INVALID_SOCKET); -#endif + // Set TCP_NODELAY on writer socket. + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, + (char *)&tcp_nodelay, sizeof (tcp_nodelay)); + wsa_assert (rc != SOCKET_ERROR); -#if defined ZMQ_HAVE_OPENVMS + // Connect writer to the listener. + rc = connect (*w_, (sockaddr *) &addr, sizeof (addr)); + wsa_assert (rc != SOCKET_ERROR); -int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_, - int sv_ [2]) -{ - int listener; - sockaddr_in lcladdr; - socklen_t lcladdr_len; - int rc; - int on = 1; + // Accept connection from writer. + *r_ = accept (listener, NULL, NULL); + wsa_assert (*r_ != INVALID_SOCKET); + + // We don't need the listening socket anymore. Close it. + rc = closesocket (listener); + wsa_assert (rc != SOCKET_ERROR); + + return 0; - zmq_assert (type_ == SOCK_STREAM); +#elif defined ZMQ_HAVE_OPENVMS - // Fill in the localhost address (127.0.0.1). + // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further, + // it does not set the socket options TCP_NODELAY and TCP_NODELACK which + // can lead to performance problems. + // + // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll + // create the socket pair manually. + sockaddr_in lcladdr; memset (&lcladdr, 0, sizeof (lcladdr)); lcladdr.sin_family = AF_INET; lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); lcladdr.sin_port = 0; - listener = socket (AF_INET, SOCK_STREAM, 0); + int listener = socket (AF_INET, SOCK_STREAM, 0); errno_assert (listener != -1); - rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); + int on = 1; + int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); errno_assert (rc != -1); rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); @@ -342,8 +324,8 @@ int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_, rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); errno_assert (rc != -1); - - lcladdr_len = sizeof (lcladdr); + + socklen_t lcladdr_len = sizeof (lcladdr); rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len); errno_assert (rc != -1); @@ -351,25 +333,34 @@ int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_, rc = listen (listener, 1); errno_assert (rc != -1); - sv_ [0] = socket (AF_INET, SOCK_STREAM, 0); - errno_assert (rc != -1); + *w_ = socket (AF_INET, SOCK_STREAM, 0); + errno_assert (*w_ != -1); - rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); errno_assert (rc != -1); - rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); errno_assert (rc != -1); - rc = connect (sv_ [0], (struct sockaddr*) &lcladdr, sizeof (lcladdr)); + rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); errno_assert (rc != -1); - sv_ [1] = accept (listener, NULL, NULL); - errno_assert (sv_ [1] != -1); + *r_ = accept (listener, NULL, NULL); + errno_assert (*r_ != -1); close (listener); return 0; -} + +#else // All other implementations support socketpair() + + int sv [2]; + int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); + errno_assert (rc == 0); + *w_ = sv [0]; + *r_ = sv [1]; + return 0; #endif +} diff --git a/src/signaler.hpp b/src/signaler.hpp index 3e5ff13..faf3f1f 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -44,24 +44,14 @@ namespace zmq private: -#if defined ZMQ_HAVE_OPENVMS - - // Whilst OpenVMS supports socketpair - it maps to AF_INET only. - // Further, it does not set the socket options TCP_NODELAY and - // TCP_NODELACK which can lead to performance problems. We'll - // overload the socketpair function for this class. - // - // The bug will be fixed in V5.6 ECO4 and beyond. In the - // meantime, we'll create the socket pair manually. - static int socketpair (int domain_, int type_, int protocol_, - int sv_ [2]); -#endif - // Write & read end of the socketpair. fd_t w; fd_t r; - // Disable copying of fd_signeler object. + // Platform-dependent function to create a socketpair. + static int make_socketpair (fd_t *r_, fd_t *w_); + + // Disable copying of signaler_t object. signaler_t (const signaler_t&); void operator = (const signaler_t&); }; |