summaryrefslogtreecommitdiff
path: root/src/signaler.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-14 10:08:19 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-15 06:55:29 +0200
commitee66c579dedf7130aa4d59afbf373f28c98eead5 (patch)
tree2b29d8e236b02789877da88ecf05d5b463c046da /src/signaler.cpp
parent19894e0a1b6fbbcb62028fc6513ef3904a6f5c76 (diff)
Report EMFILE/ENFILE from xs_socket()
This patch propoagates the error from signaler and mailbox initialisation up the stack. To achieve this signaler and mailbox classes were re-written is C-like syntax. Finally, shutdown_stress test now ignores EMFILE/ENFILE errors. Thus, the tests should pass even on OSX which sets the max number of file descriptors pretty low by default. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/signaler.cpp')
-rw-r--r--src/signaler.cpp366
1 files changed, 199 insertions, 167 deletions
diff --git a/src/signaler.cpp b/src/signaler.cpp
index 78a13b2..af905a3 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -83,164 +83,19 @@
#include <ioctl.h>
#endif
-xs::signaler_t::signaler_t ()
-{
- // Create the socketpair for signaling.
- int rc = make_fdpair (&r, &w);
- errno_assert (rc == 0);
-
- // Set both fds to non-blocking mode.
- unblock_socket (w);
- unblock_socket (r);
-}
-
-xs::signaler_t::~signaler_t ()
-{
-#if defined XS_HAVE_EVENTFD
- int rc = close (r);
- errno_assert (rc == 0);
-#elif defined XS_HAVE_WINDOWS
- int rc = closesocket (w);
- wsa_assert (rc != SOCKET_ERROR);
- rc = closesocket (r);
- wsa_assert (rc != SOCKET_ERROR);
-#else
- int rc = close (w);
- errno_assert (rc == 0);
- rc = close (r);
- errno_assert (rc == 0);
-#endif
-}
-
-xs::fd_t xs::signaler_t::get_fd ()
-{
- return r;
-}
-
-void xs::signaler_t::send ()
-{
-#if defined XS_HAVE_EVENTFD
- const uint64_t inc = 1;
- ssize_t sz = write (w, &inc, sizeof (inc));
- errno_assert (sz == sizeof (inc));
-#elif defined XS_HAVE_WINDOWS
- unsigned char dummy = 0;
- int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
- wsa_assert (nbytes != SOCKET_ERROR);
- xs_assert (nbytes == sizeof (dummy));
-#else
- unsigned char dummy = 0;
- while (true) {
-#if defined MSG_NOSIGNAL
- ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), MSG_NOSIGNAL);
-#else
- ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
-#endif
- if (unlikely (nbytes == -1 && errno == EINTR))
- continue;
- xs_assert (nbytes == sizeof (dummy));
- break;
- }
-#endif
-}
-
-int xs::signaler_t::wait (int timeout_)
-{
-#ifdef XS_SIGNALER_WAIT_BASED_ON_POLL
-
- struct pollfd pfd;
- pfd.fd = r;
- pfd.events = POLLIN;
- int rc = poll (&pfd, 1, timeout_);
- if (unlikely (rc < 0)) {
- xs_assert (errno == EINTR);
- return -1;
- }
- else if (unlikely (rc == 0)) {
- errno = EAGAIN;
- return -1;
- }
- xs_assert (rc == 1);
- xs_assert (pfd.revents & POLLIN);
- return 0;
-
-#elif defined XS_SIGNALER_WAIT_BASED_ON_SELECT
-
- fd_set fds;
- FD_ZERO (&fds);
- FD_SET (r, &fds);
- struct timeval timeout;
- if (timeout_ >= 0) {
- timeout.tv_sec = timeout_ / 1000;
- timeout.tv_usec = timeout_ % 1000 * 1000;
- }
-#ifdef XS_HAVE_WINDOWS
- int rc = select (0, &fds, NULL, NULL,
- timeout_ >= 0 ? &timeout : NULL);
- wsa_assert (rc != SOCKET_ERROR);
-#else
- int rc = select (r + 1, &fds, NULL, NULL,
- timeout_ >= 0 ? &timeout : NULL);
- if (unlikely (rc < 0)) {
- xs_assert (errno == EINTR);
- return -1;
- }
-#endif
- if (unlikely (rc == 0)) {
- errno = EAGAIN;
- return -1;
- }
- xs_assert (rc == 1);
- return 0;
-
-#else
-#error
-#endif
-}
-
-void xs::signaler_t::recv ()
-{
- // Attempt to read a signal.
-#if defined XS_HAVE_EVENTFD
- uint64_t dummy;
- ssize_t sz = read (r, &dummy, sizeof (dummy));
- errno_assert (sz == sizeof (dummy));
-
- // If we accidentally grabbed the next signal along with the current
- // one, return it back to the eventfd object.
- if (unlikely (dummy == 2)) {
- const uint64_t inc = 1;
- ssize_t sz = write (w, &inc, sizeof (inc));
- errno_assert (sz == sizeof (inc));
- return;
- }
-
- xs_assert (dummy == 1);
-#else
- unsigned char dummy;
-#if defined XS_HAVE_WINDOWS
- int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
- wsa_assert (nbytes != SOCKET_ERROR);
-#else
- ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
- errno_assert (nbytes >= 0);
-#endif
- xs_assert (nbytes == sizeof (dummy));
- xs_assert (dummy == 0);
-#endif
-}
-
-int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
+static int make_fdpair (xs::fd_t *r_, xs::fd_t *w_)
{
#if defined XS_HAVE_EVENTFD
// Create eventfd object.
#if defined EFD_CLOEXEC
- fd_t fd = eventfd (0, EFD_CLOEXEC);
- errno_assert (fd != -1);
+ xs::fd_t fd = eventfd (0, EFD_CLOEXEC);
+ if (fd == -1)
+ return -1;
#else
- fd_t fd = eventfd (0, 0);
- errno_assert (fd != -1);
+ xs::fd_t fd = eventfd (0, 0);
+ if (fd == -1)
+ return -1;
#if defined FD_CLOEXEC
int rc = fcntl (fd, F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
@@ -292,13 +147,18 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Create listening socket.
SOCKET listener;
- listener = open_tcp_socket (AF_INET, false);
- wsa_assert (listener != INVALID_SOCKET);
+ listener = xs::open_socket (AF_INET, SOCK_STREAM, 0);
+ if (listener == xs::retired_fd)
+ return -1;
- // Set SO_REUSEADDR on the listening socket.
- BOOL reuseaddr = 1;
+ // Set SO_REUSEADDR and TCP_NODELAY on listening socket.
+ BOOL so_reuseaddr = 1;
int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
- (char*) &reuseaddr, sizeof (reuseaddr));
+ (char *)&so_reuseaddr, sizeof (so_reuseaddr));
+ wsa_assert (rc != SOCKET_ERROR);
+ BOOL tcp_nodelay = 1;
+ rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
+ (char *)&tcp_nodelay, sizeof (tcp_nodelay));
wsa_assert (rc != SOCKET_ERROR);
// Bind listening socket to the local port.
@@ -306,7 +166,7 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
memset (&addr, 0, sizeof (addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
- addr.sin_port = htons (signaler_port);
+ addr.sin_port = htons (xs::signaler_port);
rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR);
@@ -315,8 +175,17 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
wsa_assert (rc != SOCKET_ERROR);
// Create the writer socket.
- *w_ = open_tcp_socket (AF_INET, false);
- wsa_assert (*w_ != INVALID_SOCKET);
+ *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
+ if (*w_ == xs::retired_fd) {
+ rc = closesocket (listener);
+ wsa_assert (rc != SOCKET_ERROR);
+ return -1;
+ }
+
+ // Set TCP_NODELAY on writer socket.
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
+ (char *)&tcp_nodelay, sizeof (tcp_nodelay));
+ wsa_assert (rc != SOCKET_ERROR);
// Connect writer to the listener.
rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
@@ -324,8 +193,13 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Accept connection from writer.
*r_ = accept (listener, NULL, NULL);
- wsa_assert (*r_ != INVALID_SOCKET);
- tune_tcp_socket (*r_, false);
+ if (*r_ == xs::retired_fd) {
+ rc = closesocket (listener);
+ wsa_assert (rc != SOCKET_ERROR);
+ rc = closesocket (*w_);
+ wsa_assert (rc != SOCKET_ERROR);
+ return -1;
+ }
// We don't need the listening socket anymore. Close it.
rc = closesocket (listener);
@@ -351,14 +225,17 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
lcladdr.sin_port = 0;
- int listener = open_tcp_socket (AF_INET, false);
+ int listener = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (listener != -1);
int on = 1;
+ int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
+ errno_assert (rc != -1);
+
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
errno_assert (rc != -1);
- rc = bind (listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
+ rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
errno_assert (rc != -1);
socklen_t lcladdr_len = sizeof (lcladdr);
@@ -369,9 +246,12 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
rc = listen (listener, 1);
errno_assert (rc != -1);
- *w_ = open_tcp_socket (AF_INET, false);
+ *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (*w_ != -1);
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
+ errno_assert (rc != -1);
+
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
errno_assert (rc != -1);
@@ -380,7 +260,6 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
*r_ = accept (listener, NULL, NULL);
errno_assert (*r_ != -1);
- tune_tcp_socket (*r_);
close (listener);
@@ -391,9 +270,12 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
int sv [2];
#if defined XS_HAVE_SOCK_CLOEXEC
int rc = socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, sv);
- errno_assert (rc == 0);
+ if (rc == -1)
+ return -1;
#else
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
+ if (rc == -1)
+ return -1;
errno_assert (rc == 0);
#if defined FD_CLOEXEC
rc = fcntl (sv [0], F_SETFD, FD_CLOEXEC);
@@ -409,6 +291,156 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
#endif
}
+int xs::signaler_init (xs::signaler_t *self_)
+{
+ // Create the socketpair for signaling.
+ int rc = make_fdpair (&self_->r, &self_->w);
+ if (rc != 0)
+ return -1;
+
+ // Set both fds to non-blocking mode.
+ unblock_socket (self_->w);
+ unblock_socket (self_->r);
+ return 0;
+}
+
+void xs::signaler_close (xs::signaler_t *self_)
+{
+#if defined XS_HAVE_EVENTFD
+ int rc = close (self_->r);
+ errno_assert (rc == 0);
+#elif defined XS_HAVE_WINDOWS
+ int rc = closesocket (self_->w);
+ wsa_assert (rc != SOCKET_ERROR);
+ rc = closesocket (self_->r);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int rc = close (self_->w);
+ errno_assert (rc == 0);
+ rc = close (self_->r);
+ errno_assert (rc == 0);
+#endif
+}
+
+xs::fd_t xs::signaler_fd (xs::signaler_t *self_)
+{
+ return self_->r;
+}
+
+void xs::signaler_send (xs::signaler_t *self_)
+{
+#if defined XS_HAVE_EVENTFD
+ const uint64_t inc = 1;
+ ssize_t sz = write (self_->w, &inc, sizeof (inc));
+ errno_assert (sz == sizeof (inc));
+#elif defined XS_HAVE_WINDOWS
+ unsigned char dummy = 0;
+ int nbytes = ::send (self_->w, (char*) &dummy, sizeof (dummy), 0);
+ wsa_assert (nbytes != SOCKET_ERROR);
+ xs_assert (nbytes == sizeof (dummy));
+#else
+ unsigned char dummy = 0;
+ while (true) {
+#if defined MSG_NOSIGNAL
+ ssize_t nbytes = ::send (self_->w, &dummy, sizeof (dummy),
+ MSG_NOSIGNAL);
+#else
+ ssize_t nbytes = ::send (self_->w, &dummy, sizeof (dummy), 0);
+#endif
+ if (unlikely (nbytes == -1 && errno == EINTR))
+ continue;
+ xs_assert (nbytes == sizeof (dummy));
+ break;
+ }
+#endif
+}
+
+int xs::signaler_wait (xs::signaler_t *self_, int timeout_)
+{
+#ifdef XS_SIGNALER_WAIT_BASED_ON_POLL
+
+ struct pollfd pfd;
+ pfd.fd = self_->r;
+ pfd.events = POLLIN;
+ int rc = poll (&pfd, 1, timeout_);
+ if (unlikely (rc < 0)) {
+ xs_assert (errno == EINTR);
+ return -1;
+ }
+ else if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ xs_assert (rc == 1);
+ xs_assert (pfd.revents & POLLIN);
+ return 0;
+
+#elif defined XS_SIGNALER_WAIT_BASED_ON_SELECT
+
+ fd_set fds;
+ FD_ZERO (&fds);
+ FD_SET (self_->r, &fds);
+ struct timeval timeout;
+ if (timeout_ >= 0) {
+ timeout.tv_sec = timeout_ / 1000;
+ timeout.tv_usec = timeout_ % 1000 * 1000;
+ }
+#ifdef XS_HAVE_WINDOWS
+ int rc = select (0, &fds, NULL, NULL,
+ timeout_ >= 0 ? &timeout : NULL);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int rc = select (self_->r + 1, &fds, NULL, NULL,
+ timeout_ >= 0 ? &timeout : NULL);
+ if (unlikely (rc < 0)) {
+ xs_assert (errno == EINTR);
+ return -1;
+ }
+#endif
+ if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ xs_assert (rc == 1);
+ return 0;
+
+#else
+#error
+#endif
+}
+
+void xs::signaler_recv (xs::signaler_t *self_)
+{
+ // Attempt to read a signal.
+#if defined XS_HAVE_EVENTFD
+ uint64_t dummy;
+ ssize_t sz = read (self_->r, &dummy, sizeof (dummy));
+ errno_assert (sz == sizeof (dummy));
+
+ // If we accidentally grabbed the next signal along with the current
+ // one, return it back to the eventfd object.
+ if (unlikely (dummy == 2)) {
+ const uint64_t inc = 1;
+ ssize_t sz = write (self_->w, &inc, sizeof (inc));
+ errno_assert (sz == sizeof (inc));
+ return;
+ }
+
+ xs_assert (dummy == 1);
+#else
+ unsigned char dummy;
+#if defined XS_HAVE_WINDOWS
+ int nbytes = ::recv (self_->r, (char*) &dummy, sizeof (dummy), 0);
+ wsa_assert (nbytes != SOCKET_ERROR);
+#else
+ ssize_t nbytes = ::recv (self_->r, &dummy, sizeof (dummy), 0);
+ errno_assert (nbytes >= 0);
+#endif
+ xs_assert (nbytes == sizeof (dummy));
+ xs_assert (dummy == 0);
+#endif
+}
+
#if defined XS_SIGNALER_WAIT_BASED_ON_SELECT
#undef XS_SIGNALER_WAIT_BASED_ON_SELECT
#endif