summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Lucina <mato@kotelna.sk>2011-09-08 16:38:46 +0200
committerMartin Lucina <martin@lucina.net>2012-01-23 08:54:16 +0100
commit88f712b4a31c95caef4f34c4ec65793c392314a6 (patch)
tree2f17936ce9d53b1b13908e8d4288846aaa604263 /src
parentad3e013f74d309b86e8f087932203e5787fe2d2d (diff)
parent75af6aed482ab16997c1388fe801f74d11ec12a4 (diff)
Imported Debian patch 2.1.9-1debian/2.1.9-1
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/Makefile.in28
-rw-r--r--src/config.hpp7
-rw-r--r--src/ctx.cpp14
-rw-r--r--src/io_thread.cpp2
-rw-r--r--src/ip.cpp20
-rw-r--r--src/mailbox.cpp366
-rw-r--r--src/mailbox.hpp26
-rw-r--r--src/pgm_receiver.cpp4
-rw-r--r--src/pgm_sender.cpp8
-rw-r--r--src/pgm_socket.cpp8
-rw-r--r--src/pgm_socket.hpp7
-rw-r--r--src/platform.hpp.in3
-rw-r--r--src/reaper.cpp2
-rw-r--r--src/rep.cpp2
-rw-r--r--src/select.cpp9
-rw-r--r--src/signaler.cpp344
-rw-r--r--src/signaler.hpp63
-rw-r--r--src/socket_base.cpp79
-rw-r--r--src/socket_base.hpp6
-rw-r--r--src/swap.cpp8
-rw-r--r--src/tcp_connecter.cpp3
-rw-r--r--src/tcp_socket.cpp5
-rw-r--r--src/thread.cpp16
-rw-r--r--src/windows.hpp152
-rw-r--r--src/xrep.cpp4
-rw-r--r--src/zmq.cpp24
27 files changed, 753 insertions, 459 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index ce20225..73eaf44 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -58,6 +58,7 @@ libzmq_la_SOURCES = \
select.hpp \
semaphore.hpp \
session.hpp \
+ signaler.hpp \
socket_base.hpp \
stdint.hpp \
sub.hpp \
@@ -118,6 +119,7 @@ libzmq_la_SOURCES = \
req.cpp \
select.cpp \
session.cpp \
+ signaler.cpp \
socket_base.cpp \
sub.cpp \
swap.cpp \
diff --git a/src/Makefile.in b/src/Makefile.in
index 78e3379..f7a9436 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -93,14 +93,15 @@ am_libzmq_la_OBJECTS = libzmq_la-clock.lo libzmq_la-command.lo \
libzmq_la-poller_base.lo libzmq_la-pull.lo libzmq_la-push.lo \
libzmq_la-reaper.lo libzmq_la-pub.lo libzmq_la-rep.lo \
libzmq_la-req.lo libzmq_la-select.lo libzmq_la-session.lo \
- libzmq_la-socket_base.lo libzmq_la-sub.lo libzmq_la-swap.lo \
- libzmq_la-tcp_connecter.lo libzmq_la-tcp_listener.lo \
- libzmq_la-tcp_socket.lo libzmq_la-thread.lo \
- libzmq_la-transient_session.lo libzmq_la-trie.lo \
- libzmq_la-uuid.lo libzmq_la-xpub.lo libzmq_la-xrep.lo \
- libzmq_la-xreq.lo libzmq_la-xsub.lo libzmq_la-zmq.lo \
- libzmq_la-zmq_connecter.lo libzmq_la-zmq_engine.lo \
- libzmq_la-zmq_init.lo libzmq_la-zmq_listener.lo
+ libzmq_la-signaler.lo libzmq_la-socket_base.lo \
+ libzmq_la-sub.lo libzmq_la-swap.lo libzmq_la-tcp_connecter.lo \
+ libzmq_la-tcp_listener.lo libzmq_la-tcp_socket.lo \
+ libzmq_la-thread.lo libzmq_la-transient_session.lo \
+ libzmq_la-trie.lo libzmq_la-uuid.lo libzmq_la-xpub.lo \
+ libzmq_la-xrep.lo libzmq_la-xreq.lo libzmq_la-xsub.lo \
+ libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \
+ libzmq_la-zmq_engine.lo libzmq_la-zmq_init.lo \
+ libzmq_la-zmq_listener.lo
libzmq_la_OBJECTS = $(am_libzmq_la_OBJECTS)
AM_V_lt = $(am__v_lt_$(V))
am__v_lt_ = $(am__v_lt_$(AM_DEFAULT_VERBOSITY))
@@ -350,6 +351,7 @@ libzmq_la_SOURCES = \
select.hpp \
semaphore.hpp \
session.hpp \
+ signaler.hpp \
socket_base.hpp \
stdint.hpp \
sub.hpp \
@@ -410,6 +412,7 @@ libzmq_la_SOURCES = \
req.cpp \
select.cpp \
session.cpp \
+ signaler.cpp \
socket_base.cpp \
sub.cpp \
swap.cpp \
@@ -566,6 +569,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-req.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-select.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-session.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-signaler.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-socket_base.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-sub.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-swap.Plo@am__quote@
@@ -906,6 +910,14 @@ libzmq_la-session.lo: session.cpp
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCXX_FALSE@ $(LIBTOOL) $(AM_V_lt) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(libzmq_la_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-session.lo `test -f 'session.cpp' || echo '$(srcdir)/'`session.cpp
+libzmq_la-signaler.lo: signaler.cpp
+@am__fastdepCXX_TRUE@ $(AM_V_CXX)$(LIBTOOL) $(AM_V_lt) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(libzmq_la_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-signaler.lo -MD -MP -MF $(DEPDIR)/libzmq_la-signaler.Tpo -c -o libzmq_la-signaler.lo `test -f 'signaler.cpp' || echo '$(srcdir)/'`signaler.cpp
+@am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/libzmq_la-signaler.Tpo $(DEPDIR)/libzmq_la-signaler.Plo
+@am__fastdepCXX_FALSE@ $(AM_V_CXX) @AM_BACKSLASH@
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='signaler.cpp' object='libzmq_la-signaler.lo' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCXX_FALSE@ $(LIBTOOL) $(AM_V_lt) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(libzmq_la_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-signaler.lo `test -f 'signaler.cpp' || echo '$(srcdir)/'`signaler.cpp
+
libzmq_la-socket_base.lo: socket_base.cpp
@am__fastdepCXX_TRUE@ $(AM_V_CXX)$(LIBTOOL) $(AM_V_lt) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(libzmq_la_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-socket_base.lo -MD -MP -MF $(DEPDIR)/libzmq_la-socket_base.Tpo -c -o libzmq_la-socket_base.lo `test -f 'socket_base.cpp' || echo '$(srcdir)/'`socket_base.cpp
@am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/libzmq_la-socket_base.Tpo $(DEPDIR)/libzmq_la-socket_base.Plo
diff --git a/src/config.hpp b/src/config.hpp
index f144512..db79284 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -36,6 +36,13 @@ namespace zmq
// memory allocation by approximately 99.6%
message_pipe_granularity = 256,
+ // Commands in pipe per allocation event.
+ command_pipe_granularity = 16,
+
+ // Size in bytes of the largest message that is still copied around
+ // rather than being reference-counted.
+ max_vsm_size = 29,
+
// Determines how often does socket poll for new commands when it
// still has unprocessed messages to handle. Thus, if it is set to 100,
// socket will process 100 inbound messages before doing the poll.
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 2758729..9612cd1 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -18,21 +18,23 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include "platform.hpp"
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#endif
+
#include <new>
#include <string.h>
#include "ctx.hpp"
#include "socket_base.hpp"
#include "io_thread.hpp"
-#include "platform.hpp"
#include "reaper.hpp"
#include "err.hpp"
#include "pipe.hpp"
-#if defined ZMQ_HAVE_WINDOWS
-#include "windows.h"
-#else
-#include "unistd.h"
+#if !defined ZMQ_HAVE_WINDOWS
+#include <unistd.h>
#endif
zmq::ctx_t::ctx_t (uint32_t io_threads_) :
@@ -141,7 +143,7 @@ int zmq::ctx_t::terminate ()
// Wait till reaper thread closes all the sockets.
command_t cmd;
- int rc = term_mailbox.recv (&cmd, true);
+ int rc = term_mailbox.recv (&cmd, -1);
if (rc == -1 && errno == EINTR)
return -1;
zmq_assert (rc == 0);
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index be52bdd..bb7971d 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -72,7 +72,7 @@ void zmq::io_thread_t::in_event ()
// Get the next command. If there is none, exit.
command_t cmd;
- int rc = mailbox.recv (&cmd, false);
+ int rc = mailbox.recv (&cmd, 0);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
diff --git a/src/ip.cpp b/src/ip.cpp
index a63a97d..3ea1f29 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -54,7 +54,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_)
size_t ifr_size = sizeof (struct lifreq) * ifn.lifn_count;
char *ifr = (char*) malloc (ifr_size);
alloc_assert (ifr);
-
+
// Retrieve interface names.
lifconf ifc;
ifc.lifc_family = AF_UNSPEC;
@@ -92,7 +92,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_)
return 0;
}
-#elif defined ZMQ_HAVE_AIX || ZMQ_HAVE_HPUX
+#elif defined ZMQ_HAVE_AIX || ZMQ_HAVE_HPUX || ZMQ_HAVE_ANDROID
#include <sys/types.h>
#include <unistd.h>
@@ -105,7 +105,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_)
int sd = socket (AF_INET, SOCK_DGRAM, 0);
zmq_assert (sd != -1);
- struct ifreq ifr;
+ struct ifreq ifr;
// Copy interface name for ioctl get.
strncpy (ifr.ifr_name, interface_, sizeof (ifr.ifr_name));
@@ -123,7 +123,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_)
struct sockaddr *sa = (struct sockaddr *) &ifr.ifr_addr;
*addr_ = ((sockaddr_in*)sa)->sin_addr;
- return 0;
+ return 0;
}
#elif ((defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
@@ -140,14 +140,14 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_)
// Get the addresses.
ifaddrs* ifa = NULL;
int rc = getifaddrs (&ifa);
- zmq_assert (rc == 0);
+ zmq_assert (rc == 0);
zmq_assert (ifa != NULL);
// Find the corresponding network interface.
bool found = false;
for (ifaddrs *ifp = ifa; ifp != NULL ;ifp = ifp->ifa_next)
- if (ifp->ifa_addr && ifp->ifa_addr->sa_family == AF_INET
- && !strcmp (interface_, ifp->ifa_name))
+ if (ifp->ifa_addr && ifp->ifa_addr->sa_family == AF_INET
+ && !strcmp (interface_, ifp->ifa_name))
{
*addr_ = ((sockaddr_in*) ifp->ifa_addr)->sin_addr;
found = true;
@@ -294,7 +294,7 @@ int zmq::resolve_ip_hostname (sockaddr_storage *addr_, socklen_t *addr_len_,
// Need to choose one to avoid duplicate results from getaddrinfo() - this
// doesn't really matter, since it's not included in the addr-output.
req.ai_socktype = SOCK_STREAM;
-
+
// Avoid named services due to unclear socktype.
req.ai_flags = AI_NUMERICSERV;
@@ -311,9 +311,9 @@ int zmq::resolve_ip_hostname (sockaddr_storage *addr_, socklen_t *addr_len_,
zmq_assert ((size_t) (res->ai_addrlen) <= sizeof (*addr_));
memcpy (addr_, res->ai_addr, res->ai_addrlen);
*addr_len_ = res->ai_addrlen;
-
+
freeaddrinfo (res);
-
+
return 0;
}
diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index 221396b..a99a9ec 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -21,362 +21,62 @@
#include "mailbox.hpp"
#include "platform.hpp"
#include "err.hpp"
-#include "fd.hpp"
-#include "ip.hpp"
-
-#if defined ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#else
-#include <unistd.h>
-#include <fcntl.h>
-#include <limits.h>
-#include <netinet/tcp.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#endif
-
-zmq::fd_t zmq::mailbox_t::get_fd ()
-{
- return r;
-}
-
-#if defined ZMQ_HAVE_WINDOWS
zmq::mailbox_t::mailbox_t ()
{
- // Create the socketpair for signalling.
- int rc = make_socketpair (&r, &w);
- errno_assert (rc == 0);
-
- // Set the writer to non-blocking mode.
- unsigned long argp = 1;
- rc = ioctlsocket (w, FIONBIO, &argp);
- wsa_assert (rc != SOCKET_ERROR);
-
- // Set the reader to non-blocking mode.
- argp = 1;
- rc = ioctlsocket (r, FIONBIO, &argp);
- wsa_assert (rc != SOCKET_ERROR);
+ // Get the pipe into passive state. That way, if the users starts by
+ // polling on the associated file descriptor it will get woken up when
+ // new command is posted.
+ bool ok = cpipe.read (NULL);
+ zmq_assert (!ok);
+ active = false;
}
zmq::mailbox_t::~mailbox_t ()
{
- int rc = closesocket (w);
- wsa_assert (rc != SOCKET_ERROR);
-
- rc = closesocket (r);
- wsa_assert (rc != SOCKET_ERROR);
-}
-
-void zmq::mailbox_t::send (const command_t &cmd_)
-{
- // TODO: Implement SNDBUF auto-resizing as for POSIX platforms.
- // In the mean time, the following code with assert if the send()
- // call would block.
- int nbytes = ::send (w, (char *)&cmd_, sizeof (command_t), 0);
- wsa_assert (nbytes != SOCKET_ERROR);
- zmq_assert (nbytes == sizeof (command_t));
-}
-
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
-{
- // If required, set the reader to blocking mode.
- if (block_) {
- unsigned long argp = 0;
- int rc = ioctlsocket (r, FIONBIO, &argp);
- wsa_assert (rc != SOCKET_ERROR);
- }
-
- // Attempt to read an entire command. Returns EAGAIN if non-blocking
- // and a command is not available. Save value of errno if we wish to pass
- // it to caller.
- int err = 0;
- int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0);
- if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
- err = EAGAIN;
-
- // Re-set the reader to non-blocking mode.
- if (block_) {
- unsigned long argp = 1;
- int rc = ioctlsocket (r, FIONBIO, &argp);
- wsa_assert (rc != SOCKET_ERROR);
- }
-
- // If the recv failed, return with the saved errno.
- if (err != 0) {
- errno = err;
- return -1;
- }
-
- // Sanity check for success.
- wsa_assert (nbytes != SOCKET_ERROR);
-
- // Check whether we haven't got half of command.
- zmq_assert (nbytes == sizeof (command_t));
-
- return 0;
-}
-
-#else
-
-zmq::mailbox_t::mailbox_t ()
-{
-#ifdef PIPE_BUF
- // Make sure that command can be written to the socket in atomic fashion.
- // If this wasn't guaranteed, commands from different threads would be
- // interleaved.
- zmq_assert (sizeof (command_t) <= PIPE_BUF);
-#endif
-
- // Create the socketpair for signaling.
- int rc = make_socketpair (&r, &w);
- errno_assert (rc == 0);
-
- // Set the writer to non-blocking mode.
- int flags = fcntl (w, F_GETFL, 0);
- errno_assert (flags >= 0);
- rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc == 0);
-
-#ifndef MSG_DONTWAIT
- // Set the reader to non-blocking mode.
- flags = fcntl (r, F_GETFL, 0);
- errno_assert (flags >= 0);
- rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc == 0);
-#endif
+ // TODO: Retrieve and deallocate commands inside the cpipe.
}
-zmq::mailbox_t::~mailbox_t ()
+zmq::fd_t zmq::mailbox_t::get_fd ()
{
- close (w);
- close (r);
+ return signaler.get_fd ();
}
void zmq::mailbox_t::send (const command_t &cmd_)
{
- // Attempt to write an entire command without blocking.
- ssize_t nbytes;
- do {
- nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
- } while (nbytes == -1 && errno == EINTR);
-
- // Attempt to increase mailbox SNDBUF if the send failed.
- if (nbytes == -1 && errno == EAGAIN) {
- int old_sndbuf, new_sndbuf;
- socklen_t sndbuf_size = sizeof old_sndbuf;
-
- // Retrieve current send buffer size.
- int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,
- &sndbuf_size);
- errno_assert (rc == 0);
- new_sndbuf = old_sndbuf * 2;
-
- // Double the new send buffer size.
- rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);
- errno_assert (rc == 0);
-
- // Verify that the OS actually honored the request.
- rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);
- errno_assert (rc == 0);
- zmq_assert (new_sndbuf > old_sndbuf);
-
- // Retry the sending operation; at this point it must succeed.
- do {
- nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
- } while (nbytes == -1 && errno == EINTR);
- }
- errno_assert (nbytes != -1);
-
- // This should never happen as we've already checked that command size is
- // less than PIPE_BUF.
- zmq_assert (nbytes == sizeof (command_t));
+ sync.lock ();
+ cpipe.write (cmd_, false);
+ bool ok = cpipe.flush ();
+ sync.unlock ();
+ if (!ok)
+ signaler.send ();
}
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{
-#ifdef MSG_DONTWAIT
-
- // Attempt to read an entire command. Returns EAGAIN if non-blocking
- // mode is requested and a command is not available.
- ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
- block_ ? 0 : MSG_DONTWAIT);
- if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
- return -1;
-#else
-
- // If required, set the reader to blocking mode.
- if (block_) {
- int flags = fcntl (r, F_GETFL, 0);
- errno_assert (flags >= 0);
- int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
- errno_assert (rc == 0);
- }
-
- // Attempt to read an entire command. Returns EAGAIN if non-blocking
- // and a command is not available. Save value of errno if we wish to pass
- // it to caller.
- int err = 0;
- ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
- if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
- err = errno;
-
- // Re-set the reader to non-blocking mode.
- if (block_) {
- int flags = fcntl (r, F_GETFL, 0);
- errno_assert (flags >= 0);
- int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc == 0);
+ // Try to get the command straight away.
+ if (active) {
+ bool ok = cpipe.read (cmd_);
+ if (ok)
+ return 0;
+
+ // If there are no more commands available, switch into passive state.
+ active = false;
+ signaler.recv ();
}
- // If the recv failed, return with the saved errno if set.
- if (err != 0) {
- errno = err;
+ // Wait for signal from the command sender.
+ int rc = signaler.wait (timeout_);
+ if (rc != 0 && (errno == EAGAIN || errno == EINTR))
return -1;
- }
-
-#endif
-
- // Sanity check for success.
- errno_assert (nbytes != -1);
-
- // Check whether we haven't got half of command.
- zmq_assert (nbytes == sizeof (command_t));
-
- return 0;
-}
-
-#endif
-
-int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
-{
-#if defined ZMQ_HAVE_WINDOWS
-
- // Windows has no 'socketpair' function. CreatePipe is no good as pipe
- // handles cannot be polled on. Here we create the socketpair by hand.
- *w_ = INVALID_SOCKET;
- *r_ = INVALID_SOCKET;
-
- // Create listening socket.
- SOCKET listener;
- listener = socket (AF_INET, SOCK_STREAM, 0);
- wsa_assert (listener != INVALID_SOCKET);
-
- // Set SO_REUSEADDR and TCP_NODELAY on listening socket.
- BOOL so_reuseaddr = 1;
- int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
- (char *)&so_reuseaddr, sizeof (so_reuseaddr));
- wsa_assert (rc != SOCKET_ERROR);
- BOOL tcp_nodelay = 1;
- rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
- (char *)&tcp_nodelay, sizeof (tcp_nodelay));
- wsa_assert (rc != SOCKET_ERROR);
-
- // Bind listening socket to any free local port.
- struct sockaddr_in addr;
- memset (&addr, 0, sizeof (addr));
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
- addr.sin_port = 0;
- rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
- wsa_assert (rc != SOCKET_ERROR);
-
- // Retrieve local port listener is bound to (into addr).
- int addrlen = sizeof (addr);
- rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
- wsa_assert (rc != SOCKET_ERROR);
-
- // Listen for incomming connections.
- rc = listen (listener, 1);
- wsa_assert (rc != SOCKET_ERROR);
-
- // Create the writer socket.
- *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
- wsa_assert (*w_ != INVALID_SOCKET);
-
- // Set TCP_NODELAY on writer socket.
- rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
- (char *)&tcp_nodelay, sizeof (tcp_nodelay));
- wsa_assert (rc != SOCKET_ERROR);
- // Connect writer to the listener.
- rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
- wsa_assert (rc != SOCKET_ERROR);
+ // We've got the signal. Now we can switch into active state.
+ active = true;
- // Accept connection from writer.
- *r_ = accept (listener, NULL, NULL);
- wsa_assert (*r_ != INVALID_SOCKET);
-
- // We don't need the listening socket anymore. Close it.
- rc = closesocket (listener);
- wsa_assert (rc != SOCKET_ERROR);
-
- return 0;
-
-#elif defined ZMQ_HAVE_OPENVMS
-
- // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
- // it does not set the socket options TCP_NODELAY and TCP_NODELACK which
- // can lead to performance problems.
- //
- // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
- // create the socket pair manually.
- sockaddr_in lcladdr;
- memset (&lcladdr, 0, sizeof (lcladdr));
- lcladdr.sin_family = AF_INET;
- lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
- lcladdr.sin_port = 0;
-
- int listener = socket (AF_INET, SOCK_STREAM, 0);
- errno_assert (listener != -1);
-
- int on = 1;
- int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
- errno_assert (rc != -1);
-
- rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
- errno_assert (rc != -1);
-
- rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
- errno_assert (rc != -1);
-
- socklen_t lcladdr_len = sizeof (lcladdr);
-
- rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
- errno_assert (rc != -1);
-
- rc = listen (listener, 1);
- errno_assert (rc != -1);
-
- *w_ = socket (AF_INET, SOCK_STREAM, 0);
- errno_assert (*w_ != -1);
-
- rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
- errno_assert (rc != -1);
-
- rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
- errno_assert (rc != -1);
-
- rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
- errno_assert (rc != -1);
-
- *r_ = accept (listener, NULL, NULL);
- errno_assert (*r_ != -1);
-
- close (listener);
-
- return 0;
-
-#else // All other implementations support socketpair()
-
- int sv [2];
- int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
+ // Get a command.
errno_assert (rc == 0);
- *w_ = sv [0];
- *r_ = sv [1];
+ bool ok = cpipe.read (cmd_);
+ zmq_assert (ok);
return 0;
-
-#endif
}
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index 96bf4eb..0675b99 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -24,10 +24,12 @@
#include <stddef.h>
#include "platform.hpp"
+#include "signaler.hpp"
#include "fd.hpp"
-#include "stdint.hpp"
#include "config.hpp"
#include "command.hpp"
+#include "ypipe.hpp"
+#include "mutex.hpp"
namespace zmq
{
@@ -41,16 +43,26 @@ namespace zmq
fd_t get_fd ();
void send (const command_t &cmd_);
- int recv (command_t *cmd_, bool block_);
+ int recv (command_t *cmd_, int timeout_);
private:
- // Write & read end of the socketpair.
- fd_t w;
- fd_t r;
+ // The pipe to store actual commands.
+ typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t;
+ cpipe_t cpipe;
- // Platform-dependent function to create a socketpair.
- static int make_socketpair (fd_t *r_, fd_t *w_);
+ // Signaler to pass signals from writer thread to reader thread.
+ signaler_t signaler;
+
+ // There's only one thread receiving from the mailbox, but there
+ // is arbitrary number of threads sending. Given that ypipe requires
+ // synchronised access on both of its endpoints, we have to synchronise
+ // the sending side.
+ mutex_t sync;
+
+ // True if the underlying pipe is active, ie. when we are allowed to
+ // read commands from it.
+ bool active;
// Disable copying of mailbox_t object.
mailbox_t (const mailbox_t&);
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 4fadadc..7e7da96 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -60,8 +60,8 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
// Retrieve PGM fds and start polling.
- int socket_fd;
- int waiting_pipe_fd;
+ fd_t socket_fd = retired_fd;
+ fd_t waiting_pipe_fd = retired_fd;
pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
socket_handle = add_fd (socket_fd);
pipe_handle = add_fd (waiting_pipe_fd);
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 4d76433..7a0c951 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -64,10 +64,10 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
// Alocate 2 fds for PGM socket.
- int downlink_socket_fd = 0;
- int uplink_socket_fd = 0;
- int rdata_notify_fd = 0;
- int pending_notify_fd = 0;
+ fd_t downlink_socket_fd = retired_fd;
+ fd_t uplink_socket_fd = retired_fd;
+ fd_t rdata_notify_fd = retired_fd;
+ fd_t pending_notify_fd = retired_fd;
encoder.set_inout (inout_);
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 5a82907..29ff3e6 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -394,8 +394,8 @@ zmq::pgm_socket_t::~pgm_socket_t ()
// Get receiver fds. receive_fd_ is signaled for incoming packets,
// waiting_pipe_fd_ is signaled for state driven events and data.
-void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,
- int *waiting_pipe_fd_)
+void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
+ fd_t *waiting_pipe_fd_)
{
socklen_t socklen;
bool rc;
@@ -421,8 +421,8 @@ void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,
// receive_fd_ is for incoming back-channel protocol packets.
// rdata_notify_fd_ is raised for waiting repair transmissions.
// pending_notify_fd_ is for state driven events.
-void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
- int *rdata_notify_fd_, int *pending_notify_fd_)
+void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
+ fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_)
{
socklen_t socklen;
bool rc;
diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp
index 9699aed..5ba8387 100644
--- a/src/pgm_socket.hpp
+++ b/src/pgm_socket.hpp
@@ -36,6 +36,7 @@
#include <pgm/in.h>
#endif
+#include "fd.hpp"
#include "options.hpp"
namespace zmq
@@ -56,12 +57,12 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_);
// Get receiver fds and store them into user allocated memory.
- void get_receiver_fds (int *receive_fd_, int *waiting_pipe_fd_);
+ void get_receiver_fds (fd_t *receive_fd_, fd_t *waiting_pipe_fd_);
// Get sender and receiver fds and store it to user allocated
// memory. Receive fd is used to process NAKs from peers.
- void get_sender_fds (int *send_fd_, int *receive_fd_,
- int *rdata_notify_fd_, int *pending_notify_fd_);
+ void get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
+ fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_);
// Send data as one APDU, transmit window owned memory.
size_t send (unsigned char *data_, size_t data_len_);
diff --git a/src/platform.hpp.in b/src/platform.hpp.in
index 9ac777f..c89739d 100644
--- a/src/platform.hpp.in
+++ b/src/platform.hpp.in
@@ -160,6 +160,9 @@
/* Have AIX OS */
#undef ZMQ_HAVE_AIX
+/* Have Android OS */
+#undef ZMQ_HAVE_ANDROID
+
/* Have Cygwin */
#undef ZMQ_HAVE_CYGWIN
diff --git a/src/reaper.cpp b/src/reaper.cpp
index d3ebbba..f94f7c1 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -61,7 +61,7 @@ void zmq::reaper_t::in_event ()
// Get the next command. If there is none, exit.
command_t cmd;
- int rc = mailbox.recv (&cmd, false);
+ int rc = mailbox.recv (&cmd, 0);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
diff --git a/src/rep.cpp b/src/rep.cpp
index dc55ad0..58c810b 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -85,7 +85,7 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Push it to the reply pipe.
rc = xrep_t::xsend (msg_, flags_);
- zmq_assert (rc == 0);
+ errno_assert (rc == 0);
}
else {
// If the traceback stack is malformed, discard anything
diff --git a/src/select.cpp b/src/select.cpp
index 56f9f74..78fb6f6 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -19,20 +19,21 @@
*/
#include "platform.hpp"
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#endif
#include <string.h>
#include <algorithm>
-#ifdef ZMQ_HAVE_WINDOWS
-#include "winsock2.h"
-#elif defined ZMQ_HAVE_HPUX
+#if defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
-#else
+#elif !defined ZMQ_HAVE_WINDOWS
#include <sys/select.h>
#endif
diff --git a/src/signaler.cpp b/src/signaler.cpp
new file mode 100644
index 0000000..fa2f123
--- /dev/null
+++ b/src/signaler.cpp
@@ -0,0 +1,344 @@
+/*
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "platform.hpp"
+
+#if defined ZMQ_FORCE_SELECT
+#define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+#elif defined ZMQ_FORCE_POLL
+#define ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
+ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
+ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
+ defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
+ defined ZMQ_HAVE_NETBSD
+#define ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+#define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+#endif
+
+// On AIX, poll.h has to be included before zmq.h to get consistent
+// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
+// instead of 'events' and 'revents' and defines macros to map from POSIX-y
+// names to AIX-specific names).
+#if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+#include <poll.h>
+#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#elif defined ZMQ_HAVE_HPUX
+#include <sys/param.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#elif defined ZMQ_HAVE_OPENVMS
+#include <sys/types.h>
+#include <sys/time.h>
+#else
+#include <sys/select.h>
+#endif
+#endif
+
+#include "signaler.hpp"
+#include "likely.hpp"
+#include "err.hpp"
+#include "fd.hpp"
+#include "ip.hpp"
+
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include <unistd.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <netinet/tcp.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#endif
+
+zmq::signaler_t::signaler_t ()
+{
+ // Create the socketpair for signaling.
+ int rc = make_fdpair (&r, &w);
+ errno_assert (rc == 0);
+
+ // Set both fds to non-blocking mode.
+#if defined ZMQ_HAVE_WINDOWS
+ unsigned long argp = 1;
+ rc = ioctlsocket (w, FIONBIO, &argp);
+ wsa_assert (rc != SOCKET_ERROR);
+ rc = ioctlsocket (r, FIONBIO, &argp);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int flags = fcntl (w, F_GETFL, 0);
+ errno_assert (flags >= 0);
+ rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
+ errno_assert (rc == 0);
+ flags = fcntl (r, F_GETFL, 0);
+ errno_assert (flags >= 0);
+ rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
+ errno_assert (rc == 0);
+#endif
+}
+
+zmq::signaler_t::~signaler_t ()
+{
+#if defined ZMQ_HAVE_WINDOWS
+ int rc = closesocket (w);
+ wsa_assert (rc != SOCKET_ERROR);
+ rc = closesocket (r);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ close (w);
+ close (r);
+#endif
+}
+
+zmq::fd_t zmq::signaler_t::get_fd ()
+{
+ return r;
+}
+
+void zmq::signaler_t::send ()
+{
+#if defined ZMQ_HAVE_WINDOWS
+ unsigned char dummy = 0;
+ int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
+ wsa_assert (nbytes != SOCKET_ERROR);
+ zmq_assert (nbytes == sizeof (dummy));
+#else
+ unsigned char dummy = 0;
+ while (true) {
+ ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
+ if (unlikely (nbytes == -1 && errno == EINTR))
+ continue;
+ zmq_assert (nbytes == sizeof (dummy));
+ break;
+ }
+#endif
+}
+
+int zmq::signaler_t::wait (int timeout_)
+{
+#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+
+ struct pollfd pfd;
+ pfd.fd = r;
+ pfd.events = POLLIN;
+ int rc = poll (&pfd, 1, timeout_);
+ if (unlikely (rc < 0)) {
+ zmq_assert (errno == EINTR);
+ return -1;
+ }
+ else if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+ zmq_assert (pfd.revents & POLLIN);
+ return 0;
+
+#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+
+ fd_set fds;
+ FD_ZERO (&fds);
+ FD_SET (r, &fds);
+ struct timeval timeout;
+ if (timeout_ >= 0) {
+ timeout.tv_sec = timeout_ / 1000;
+ timeout.tv_usec = timeout_ % 1000 * 1000;
+ }
+#ifdef ZMQ_HAVE_WINDOWS
+ int rc = select (0, &fds, NULL, NULL,
+ timeout_ >= 0 ? &timeout : NULL);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int rc = select (r + 1, &fds, NULL, NULL,
+ timeout_ >= 0 ? &timeout : NULL);
+ if (unlikely (rc < 0)) {
+ zmq_assert (errno == EINTR);
+ return -1;
+ }
+#endif
+ if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+ return 0;
+
+#else
+#error
+#endif
+}
+
+void zmq::signaler_t::recv ()
+{
+ // Attempt to read a signal.
+ unsigned char dummy;
+#ifdef ZMQ_HAVE_WINDOWS
+ int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
+ wsa_assert (nbytes != SOCKET_ERROR);
+#else
+ ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
+ errno_assert (nbytes >= 0);
+#endif
+ zmq_assert (nbytes == sizeof (dummy));
+ zmq_assert (dummy == 0);
+}
+
+int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
+{
+#if defined ZMQ_HAVE_WINDOWS
+
+ // Windows has no 'socketpair' function. CreatePipe is no good as pipe
+ // handles cannot be polled on. Here we create the socketpair by hand.
+ *w_ = INVALID_SOCKET;
+ *r_ = INVALID_SOCKET;
+
+ // Create listening socket.
+ SOCKET listener;
+ listener = socket (AF_INET, SOCK_STREAM, 0);
+ wsa_assert (listener != INVALID_SOCKET);
+
+ // Set SO_REUSEADDR and TCP_NODELAY on listening socket.
+ BOOL so_reuseaddr = 1;
+ int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
+ (char *)&so_reuseaddr, sizeof (so_reuseaddr));
+ wsa_assert (rc != SOCKET_ERROR);
+ BOOL tcp_nodelay = 1;
+ rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
+ (char *)&tcp_nodelay, sizeof (tcp_nodelay));
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Bind listening socket to any free local port.
+ struct sockaddr_in addr;
+ memset (&addr, 0, sizeof (addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
+ addr.sin_port = 0;
+ rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Retrieve local port listener is bound to (into addr).
+ int addrlen = sizeof (addr);
+ rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Listen for incomming connections.
+ rc = listen (listener, 1);
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Create the writer socket.
+ *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
+ wsa_assert (*w_ != INVALID_SOCKET);
+
+ // Set TCP_NODELAY on writer socket.
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
+ (char *)&tcp_nodelay, sizeof (tcp_nodelay));
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Connect writer to the listener.
+ rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Accept connection from writer.
+ *r_ = accept (listener, NULL, NULL);
+ wsa_assert (*r_ != INVALID_SOCKET);
+
+ // We don't need the listening socket anymore. Close it.
+ rc = closesocket (listener);
+ wsa_assert (rc != SOCKET_ERROR);
+
+ return 0;
+
+#elif defined ZMQ_HAVE_OPENVMS
+
+ // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
+ // it does not set the socket options TCP_NODELAY and TCP_NODELACK which
+ // can lead to performance problems.
+ //
+ // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
+ // create the socket pair manually.
+ sockaddr_in lcladdr;
+ memset (&lcladdr, 0, sizeof (lcladdr));
+ lcladdr.sin_family = AF_INET;
+ lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
+ lcladdr.sin_port = 0;
+
+ int listener = socket (AF_INET, SOCK_STREAM, 0);
+ errno_assert (listener != -1);
+
+ int on = 1;
+ int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
+ errno_assert (rc != -1);
+
+ rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
+ errno_assert (rc != -1);
+
+ rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
+ errno_assert (rc != -1);
+
+ socklen_t lcladdr_len = sizeof (lcladdr);
+
+ rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
+ errno_assert (rc != -1);
+
+ rc = listen (listener, 1);
+ errno_assert (rc != -1);
+
+ *w_ = socket (AF_INET, SOCK_STREAM, 0);
+ errno_assert (*w_ != -1);
+
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
+ errno_assert (rc != -1);
+
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
+ errno_assert (rc != -1);
+
+ rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
+ errno_assert (rc != -1);
+
+ *r_ = accept (listener, NULL, NULL);
+ errno_assert (*r_ != -1);
+
+ close (listener);
+
+ return 0;
+
+#else // All other implementations support socketpair()
+
+ int sv [2];
+ int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
+ errno_assert (rc == 0);
+ *w_ = sv [0];
+ *r_ = sv [1];
+ return 0;
+
+#endif
+}
+
+#if defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+#undef ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+#endif
+#if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+#undef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+#endif
+
diff --git a/src/signaler.hpp b/src/signaler.hpp
new file mode 100644
index 0000000..2ebff41
--- /dev/null
+++ b/src/signaler.hpp
@@ -0,0 +1,63 @@
+/*
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
+#define __ZMQ_SIGNALER_HPP_INCLUDED__
+
+#include "fd.hpp"
+
+namespace zmq
+{
+
+ // This is a cross-platform equivalent to signal_fd. However, as opposed
+ // to signal_fd there can be at most one signal in the signaler at any
+ // given moment. Attempt to send a signal before receiving the previous
+ // one will result in undefined behaviour.
+
+ class signaler_t
+ {
+ public:
+
+ signaler_t ();
+ ~signaler_t ();
+
+ fd_t get_fd ();
+ void send ();
+ int wait (int timeout_);
+ void recv ();
+
+ private:
+
+ // Creates a pair of filedescriptors that will be used
+ // to pass the signals.
+ static int make_fdpair (fd_t *r_, fd_t *w_);
+
+ // Write & read end of the socketpair.
+ fd_t w;
+ fd_t r;
+
+ // Disable copying of signaler_t object.
+ signaler_t (const signaler_t&);
+ const signaler_t &operator = (const signaler_t&);
+ };
+
+}
+
+#endif
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 24789b8..2167b0b 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -277,7 +277,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- int rc = process_commands (false, false);
+ int rc = process_commands (0, false);
if (rc != 0 && (errno == EINTR || errno == ETERM))
return -1;
errno_assert (rc == 0);
@@ -480,7 +480,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
}
// Process pending commands, if any.
- int rc = process_commands (false, true);
+ int rc = process_commands (0, true);
if (unlikely (rc != 0))
return -1;
@@ -492,20 +492,37 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
rc = xsend (msg_, flags_);
if (rc == 0)
return 0;
+ if (unlikely (errno != EAGAIN))
+ return -1;
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - upwards.
if (flags_ & ZMQ_NOBLOCK)
return -1;
+ // Compute the time when the timeout should occur.
+ // If the timeout is infite, don't care.
+ clock_t clock ;
+ int timeout = -1;
+ uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
+
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
- while (rc != 0) {
- if (errno != EAGAIN)
- return -1;
- if (unlikely (process_commands (true, false) != 0))
+ while (true) {
+ if (unlikely (process_commands (timeout, false) != 0))
return -1;
rc = xsend (msg_, flags_);
+ if (rc == 0)
+ break;
+ if (unlikely (errno != EAGAIN))
+ return -1;
+ if (timeout > 0) {
+ timeout = (int) (end - clock.now_ms ());
+ if (timeout <= 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+ }
}
return 0;
}
@@ -526,6 +543,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// Get the message.
int rc = xrecv (msg_, flags_);
+ if (unlikely (rc != 0 && errno != EAGAIN))
+ return -1;
int err = errno;
// Once every inbound_poll_rate messages check for signals and process
@@ -537,7 +556,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing RDTSC all the time.
if (++ticks == inbound_poll_rate) {
- if (unlikely (process_commands (false, false) != 0))
+ if (unlikely (process_commands (0, false) != 0))
return -1;
ticks = 0;
}
@@ -560,7 +579,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (flags_ & ZMQ_NOBLOCK) {
if (errno != EAGAIN)
return -1;
- if (unlikely (process_commands (false, false) != 0))
+ if (unlikely (process_commands (0, false) != 0))
return -1;
ticks = 0;
@@ -573,17 +592,33 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
return rc;
}
+ // Compute the time when the timeout should occur.
+ // If the timeout is infite, don't care.
+ clock_t clock ;
+ int timeout = -1;
+ uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
+
// In blocking scenario, commands are processed over and over again until
// we are able to fetch a message.
bool block = (ticks != 0);
- while (rc != 0) {
- if (errno != EAGAIN)
- return -1;
- if (unlikely (process_commands (block, false) != 0))
+ while (true) {
+ if (unlikely (process_commands (block ? timeout : 0, false) != 0))
return -1;
rc = xrecv (msg_, flags_);
- ticks = 0;
+ if (rc == 0) {
+ ticks = 0;
+ break;
+ }
+ if (unlikely (errno != EAGAIN))
+ return -1;
block = true;
+ if (timeout > 0) {
+ timeout = (int) (end - clock.now_ms ());
+ if (timeout <= 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+ }
}
rcvmore = msg_->flags & ZMQ_MSG_MORE;
@@ -655,18 +690,20 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
poller->set_pollin (handle);
}
-int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
+int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
{
int rc;
command_t cmd;
- if (block_) {
- rc = mailbox.recv (&cmd, true);
- if (rc == -1 && errno == EINTR)
- return -1;
- errno_assert (rc == 0);
+ if (timeout_ != 0) {
+
+ // If we are asked to wait, simply ask mailbox to wait.
+ rc = mailbox.recv (&cmd, timeout_);
}
else {
+ // If we are asked not to wait, check whether we haven't processed
+ // commands recently, so that we can throttle the new commands.
+
// Get the CPU's tick counter. If 0, the counter is not available.
uint64_t tsc = zmq::clock_t::rdtsc ();
@@ -687,7 +724,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
}
// Check whether there are any commands pending for this thread.
- rc = mailbox.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, 0);
}
// Process all the commands available at the moment.
@@ -698,7 +735,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
return -1;
errno_assert (rc == 0);
cmd.destination->process_command (cmd);
- rc = mailbox.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, 0);
}
if (ctx_terminated) {
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 333cddd..145211b 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -162,11 +162,11 @@ namespace zmq
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- // Processes commands sent to this socket (if any). If 'block' is
- // set to true, returns only after at least one command was processed.
+ // Processes commands sent to this socket (if any). If timeout is -1,
+ // returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once
// in a predefined time period.
- int process_commands (bool block_, bool throttle_);
+ int process_commands (int timeout_, bool throttle_);
// Handlers for incoming commands.
void process_stop ();
diff --git a/src/swap.cpp b/src/swap.cpp
index 936f30e..5f1b261 100644
--- a/src/swap.cpp
+++ b/src/swap.cpp
@@ -111,7 +111,7 @@ int zmq::swap_t::init ()
if (fd == -1)
return -1;
-#ifdef ZMQ_HAVE_LINUX
+#if (defined (ZMQ_HAVE_LINUX) && !defined (ZMQ_HAVE_ANDROID))
// Enable more aggresive read-ahead optimization.
posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL);
#endif
@@ -135,8 +135,6 @@ bool zmq::swap_t::store (zmq_msg_t *msg_)
copy_to_file (&msg_flags, sizeof msg_flags);
copy_to_file (zmq_msg_data (msg_), msg_size);
- zmq_msg_close (msg_);
-
return true;
}
@@ -214,7 +212,7 @@ void zmq::swap_t::copy_from_file (void *buffer_, size_t count_)
size_t chunk_size, remainder = count_;
while (remainder > 0) {
- chunk_size = std::min (remainder,
+ chunk_size = std::min (remainder,
std::min ((size_t) (filesize - read_pos),
(size_t) (block_size - read_pos % block_size)));
@@ -238,7 +236,7 @@ void zmq::swap_t::copy_to_file (const void *buffer_, size_t count_)
size_t chunk_size, remainder = count_;
while (remainder > 0) {
- chunk_size = std::min (remainder,
+ chunk_size = std::min (remainder,
std::min ((size_t) (filesize - write_pos),
(size_t) (block_size - write_pos % block_size)));
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 71b362b..d6f73ca 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -297,7 +297,8 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
// Networking problems are OK. No need to assert.
errno = err;
errno_assert (errno == ECONNREFUSED || errno == ECONNRESET ||
- errno == ETIMEDOUT || errno == EHOSTUNREACH);
+ errno == ETIMEDOUT || errno == EHOSTUNREACH ||
+ errno == ENETUNREACH);
return retired_fd;
}
diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp
index f1b1d19..e7a69e4 100644
--- a/src/tcp_socket.cpp
+++ b/src/tcp_socket.cpp
@@ -162,6 +162,11 @@ int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_)
errno_assert (rc == 0);
}
+#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD
+ int set = 1;
+ int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
+ errno_assert (rc == 0);
+#endif
return 0;
}
diff --git a/src/thread.cpp b/src/thread.cpp
index 7bf9df0..8e33eff 100644
--- a/src/thread.cpp
+++ b/src/thread.cpp
@@ -40,7 +40,7 @@ void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
arg =arg_;
descriptor = (HANDLE) _beginthreadex (NULL, 0,
&::thread_routine, this, 0 , NULL);
- win_assert (descriptor != NULL);
+ win_assert (descriptor != NULL);
}
void zmq::thread_t::stop ()
@@ -59,17 +59,19 @@ extern "C"
{
static void *thread_routine (void *arg_)
{
- #if !defined ZMQ_HAVE_OPENVMS
+#if !defined ZMQ_HAVE_OPENVMS
// Following code will guarantee more predictable latecnies as it'll
// disallow any signal handling in the I/O thread.
sigset_t signal_set;
int rc = sigfillset (&signal_set);
errno_assert (rc == 0);
+# if !defined ZMQ_HAVE_ANDROID
rc = pthread_sigmask (SIG_BLOCK, &signal_set, NULL);
- errno_assert (rc == 0);
- #endif
+ posix_assert (rc);
+# endif
+#endif
- zmq::thread_t *self = (zmq::thread_t*) arg_;
+ zmq::thread_t *self = (zmq::thread_t*) arg_;
self->tfn (self->arg);
return NULL;
}
@@ -80,13 +82,13 @@ void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
tfn = tfn_;
arg =arg_;
int rc = pthread_create (&descriptor, NULL, thread_routine, this);
- errno_assert (rc == 0);
+ posix_assert (rc);
}
void zmq::thread_t::stop ()
{
int rc = pthread_join (descriptor, NULL);
- errno_assert (rc == 0);
+ posix_assert (rc);
}
#endif
diff --git a/src/windows.hpp b/src/windows.hpp
index 5133875..a821339 100644
--- a/src/windows.hpp
+++ b/src/windows.hpp
@@ -24,50 +24,148 @@
// The purpose of this header file is to turn on only the items actually needed
// on the windows platform.
-#define _WINSOCKAPI_
-#ifndef NOMINMAX
-#define NOMINMAX // No min and max functions, these clash with C++.
-#endif
+// Disable deprecation warning.
#define _CRT_SECURE_NO_WARNINGS
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
-#ifndef NOUSER // No USER defines and routines.
-#define NOUSER
+#ifndef NOGDICAPMASKS
+#define NOGDICAPMASKS // CC_*, LC_*, PC_*, CP_*, TC_*, RC_
#endif
-#ifndef NOMCX // No Modem Configuration Extensions.
-#define NOMCX
+#ifndef NOVIRTUALKEYCODES
+#define NOVIRTUALKEYCODES // VK_*
#endif
-#ifndef NOIME // No Input Method Editor.
-#define NOIME
+#ifndef NOWINMESSAGES
+#define NOWINMESSAGES // WM_*, EM_*, LB_*, CB_*
#endif
-#ifndef NOSOUND // No Sound driver routines.
-#define NOSOUND
+#ifndef NOWINSTYLES
+#define NOWINSTYLES // WS_*, CS_*, ES_*, LBS_*, SBS_*, CBS_*
#endif
-
-#ifdef ZMQ_HAVE_MINGW32
-#ifdef WINVER
-#undef WINVER
+#ifndef NOSYSMETRICS
+#define NOSYSMETRICS // SM_*
#endif
-#define WINVER 0x0501
+#ifndef NOMENUS
+#define NOMENUS // MF_*
+#endif
+#ifndef NOICONS
+#define NOICONS // IDI_*
+#endif
+#ifndef NOKEYSTATES
+#define NOKEYSTATES // MK_*
+#endif
+#ifndef NOSYSCOMMANDS
+#define NOSYSCOMMANDS // SC_*
+#endif
+#ifndef NORASTEROPS
+#define NORASTEROPS // Binary and Tertiary raster ops
+#endif
+#ifndef NOSHOWWINDOW
+#define NOSHOWWINDOW // SW_*
+#endif
+#ifndef OEMRESOURCE
+#define OEMRESOURCE // OEM Resource values
+#endif
+#ifndef NOATOM
+#define NOATOM // Atom Manager routines
+#endif
+#ifndef NOCLIPBOARD
+#define NOCLIPBOARD // Clipboard routines
+#endif
+#ifndef NOCOLOR
+#define NOCOLOR // Screen colors
+#endif
+#ifndef NOCTLMGR
+#define NOCTLMGR // Control and Dialog routines
+#endif
+#ifndef NODRAWTEXT
+#define NODRAWTEXT // DrawText() and DT_*
+#endif
+#ifndef NOGDI
+#define NOGDI // All GDI defines and routines
+#endif
+#ifndef NOKERNEL
+#define NOKERNEL // All KERNEL defines and routines
+#endif
+#ifndef NOUSER
+#define NOUSER // All USER defines and routines
+#endif
+#ifndef NONLS
+#define NONLS // All NLS defines and routines
+#endif
+#ifndef NOMB
+#define NOMB // MB_* and MessageBox()
+#endif
+#ifndef NOMEMMGR
+#define NOMEMMGR // GMEM_*, LMEM_*, GHND, LHND, associated routines
+#endif
+#ifndef NOMETAFILE
+#define NOMETAFILE // typedef METAFILEPICT
+#endif
+#ifndef NOMINMAX
+#define NOMINMAX // Macros min(a,b) and max(a,b)
+#endif
+#ifndef NOMSG
+#define NOMSG // typedef MSG and associated routines
+#endif
+#ifndef NOOPENFILE
+#define NOOPENFILE // OpenFile(), OemToAnsi, AnsiToOem, and OF_*
+#endif
+#ifndef NOSCROLL
+#define NOSCROLL // SB_* and scrolling routines
+#endif
+#ifndef NOSERVICE
+#define NOSERVICE // All Service Controller routines, SERVICE_ equates, etc.
+#endif
+#ifndef NOSOUND
+#define NOSOUND // Sound driver routines
+#endif
+#ifndef NOTEXTMETRIC
+#define NOTEXTMETRIC // typedef TEXTMETRIC and associated routines
+#endif
+#ifndef NOWH
+#define NOWH // SetWindowsHook and WH_*
+#endif
+#ifndef NOWINOFFSETS
+#define NOWINOFFSETS // GWL_*, GCL_*, associated routines
+#endif
+#ifndef NOCOMM
+#define NOCOMM // COMM driver routines
+#endif
+#ifndef NOKANJI
+#define NOKANJI // Kanji support stuff.
+#endif
+#ifndef NOHELP
+#define NOHELP // Help engine interface.
+#endif
+#ifndef NOPROFILER
+#define NOPROFILER // Profiler interface.
+#endif
+#ifndef NODEFERWINDOWPOS
+#define NODEFERWINDOWPOS // DeferWindowPos routines
+#endif
+#ifndef NOMCX
+#define NOMCX // Modem Configuration ExtensionsA
#endif
-#include <windows.h>
-
-// MSVC++ 2005 on Win2000 does not define _WIN32_WINNT.
+// Set target version to Windows Server 2003, Windows XP/SP1 or higher.
#ifndef _WIN32_WINNT
-#define _WIN32_WINNT WINVER
+#define _WIN32_WINNT 0x0501
#endif
-// Enable winsock (not included when WIN32_LEAN_AND_MEAN is defined).
-#if(_WIN32_WINNT >= 0x0400)
-#include <winsock2.h>
-#include <mswsock.h>
+#include <windows.h>
+
+#ifdef __MINGW32__
+// Require Windows XP or higher with MinGW for getaddrinfo().
+#if(_WIN32_WINNT >= 0x0501)
#else
-#include <winsock.h>
-#endif
+#undef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif /* _WIN32_WINNT >= 0x0501 */
+#endif /* __MINGW32__ */
+#include <winsock2.h>
+#include <mswsock.h>
#include <ws2tcpip.h>
#include <ipexport.h>
#include <process.h>
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 7317056..ac4150d 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -189,10 +189,6 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
it->second.active = false;
more_out = false;
current_out = NULL;
- rc = zmq_msg_close (&empty);
- zmq_assert (rc == 0);
- errno = EAGAIN;
- return -1;
}
rc = zmq_msg_close (&empty);
zmq_assert (rc == 0);
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 85f7d62..dce9630 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -412,7 +412,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
Sleep (timeout_ > 0 ? timeout_ / 1000 : INFINITE);
return 0;
#else
- return usleep (timeout_);
+ usleep (timeout_);
+ return 0;
#endif
}
@@ -565,7 +566,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
Sleep (timeout_ > 0 ? timeout_ / 1000 : INFINITE);
return 0;
#else
- return usleep (timeout_);
+ usleep (timeout_);
+ return 0;
#endif
}
@@ -649,13 +651,21 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
- int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
#if defined ZMQ_HAVE_WINDOWS
- wsa_assert (rc != SOCKET_ERROR);
+ int rc = select (0, &inset, &outset, &errset, ptimeout);
+ if (unlikely (rc == SOCKET_ERROR)) {
+ zmq::wsa_error_to_errno ();
+ if (errno == ENOTSOCK)
+ return -1;
+ wsa_assert (false);
+ }
#else
- if (rc == -1 && errno == EINTR)
- return -1;
- errno_assert (rc >= 0);
+ int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
+ if (unlikely (rc == -1) {
+ if (errno == EINTR || errno == EBADF)
+ return -1;
+ errno_assert (false);
+ }
#endif
break;
}