summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/atomic_ptr.hpp2
-rw-r--r--src/ctx.cpp14
-rw-r--r--src/io_thread.cpp7
-rw-r--r--src/mailbox.cpp51
-rw-r--r--src/mailbox.hpp33
-rw-r--r--src/reaper.cpp8
-rw-r--r--src/signaler.cpp366
-rw-r--r--src/signaler.hpp40
-rw-r--r--src/socket_base.cpp32
-rw-r--r--src/socket_base.hpp6
-rw-r--r--tests/shutdown_stress.cpp13
11 files changed, 325 insertions, 247 deletions
diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp
index ac3f791..b197aa1 100644
--- a/src/atomic_ptr.hpp
+++ b/src/atomic_ptr.hpp
@@ -22,6 +22,8 @@
#ifndef __XS_ATOMIC_PTR_HPP_INCLUDED__
#define __XS_ATOMIC_PTR_HPP_INCLUDED__
+#include <stddef.h>
+
#include "platform.hpp"
#if defined XS_FORCE_MUTEXES
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 46fa984..8bd3506 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -47,9 +47,12 @@ xs::ctx_t::ctx_t () :
max_sockets (512),
io_thread_count (1)
{
+ int rc = mailbox_init (&term_mailbox);
+ errno_assert (rc == 0);
+
// Plug in the standard plugins.
- int rc = plug (prefix_filter);
- xs_assert (rc == 0);
+ rc = plug (prefix_filter);
+ errno_assert (rc == 0);
}
bool xs::ctx_t::check_tag ()
@@ -81,6 +84,9 @@ xs::ctx_t::~ctx_t ()
if (slots)
free (slots);
+ // Deallocate the termination mailbox.
+ mailbox_close (&term_mailbox);
+
// Remove the tag, so that the object is considered dead.
tag = 0xdeadbeef;
}
@@ -112,7 +118,7 @@ int xs::ctx_t::terminate ()
// Wait till reaper thread closes all the sockets.
command_t cmd;
- int rc = term_mailbox.recv (&cmd, -1);
+ int rc = mailbox_recv (&term_mailbox, &cmd, -1);
if (rc == -1 && errno == EINTR)
return -1;
xs_assert (rc == 0);
@@ -297,7 +303,7 @@ xs_filter_t *xs::ctx_t::get_filter (int filter_id_)
void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
- slots [tid_]->send (command_);
+ mailbox_send (slots [tid_], command_);
}
xs::io_thread_t *xs::ctx_t::choose_io_thread (uint64_t affinity_)
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index dba0f00..df5a623 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -48,15 +48,18 @@ xs::io_thread_t *xs::io_thread_t::create (xs::ctx_t *ctx_, uint32_t tid_)
xs::io_thread_t::io_thread_t (xs::ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
{
+ int rc = mailbox_init (&mailbox);
+ errno_assert (rc == 0);
}
xs::io_thread_t::~io_thread_t ()
{
+ mailbox_close (&mailbox);
}
void xs::io_thread_t::start ()
{
- mailbox_handle = add_fd (mailbox.get_fd (), this);
+ mailbox_handle = add_fd (mailbox_fd (&mailbox), this);
set_pollin (mailbox_handle);
xstart ();
}
@@ -149,7 +152,7 @@ void xs::io_thread_t::in_event (fd_t fd_)
// Get the next command. If there is none, exit.
command_t cmd;
- int rc = mailbox.recv (&cmd, 0);
+ int rc = mailbox_recv (&mailbox, &cmd, 0);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index c977c5b..17d3dbf 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -22,60 +22,69 @@
#include "mailbox.hpp"
#include "err.hpp"
-xs::mailbox_t::mailbox_t ()
+int xs::mailbox_init (mailbox_t *self_)
{
+ // Initlialise the signaler.
+ int rc = signaler_init (&self_->signaler);
+ if (rc != 0)
+ return -1;
+
// Get the pipe into passive state. That way, if the users starts by
// polling on the associated file descriptor it will get woken up when
// new command is posted.
- bool ok = cpipe.read (NULL);
+ bool ok = self_->cpipe.read (NULL);
xs_assert (!ok);
- active = false;
+ self_->active = false;
+ return 0;
}
-xs::mailbox_t::~mailbox_t ()
+void xs::mailbox_close (mailbox_t *self_)
{
+ // Deallocate the signaler.
+ signaler_close (&self_->signaler);
+
// TODO: Retrieve and deallocate commands inside the cpipe.
}
-xs::fd_t xs::mailbox_t::get_fd ()
+xs::fd_t xs::mailbox_fd (mailbox_t *self_)
{
- return signaler.get_fd ();
+ return signaler_fd (&self_->signaler);
}
-void xs::mailbox_t::send (const command_t &cmd_)
+void xs::mailbox_send (mailbox_t *self_, const command_t &cmd_)
{
- sync.lock ();
- cpipe.write (cmd_, false);
- bool ok = cpipe.flush ();
- sync.unlock ();
+ self_->sync.lock ();
+ self_->cpipe.write (cmd_, false);
+ bool ok = self_->cpipe.flush ();
+ self_->sync.unlock ();
if (!ok)
- signaler.send ();
+ signaler_send (&self_->signaler);
}
-int xs::mailbox_t::recv (command_t *cmd_, int timeout_)
+int xs::mailbox_recv (mailbox_t *self_, command_t *cmd_, int timeout_)
{
// Try to get the command straight away.
- if (active) {
- bool ok = cpipe.read (cmd_);
+ if (self_->active) {
+ bool ok = self_->cpipe.read (cmd_);
if (ok)
return 0;
// If there are no more commands available, switch into passive state.
- active = false;
- signaler.recv ();
+ self_->active = false;
+ signaler_recv (&self_->signaler);
}
// Wait for signal from the command sender.
- int rc = signaler.wait (timeout_);
+ int rc = signaler_wait (&self_->signaler, timeout_);
if (rc != 0 && (errno == EAGAIN || errno == EINTR))
return -1;
+ errno_assert (rc == 0);
// We've got the signal. Now we can switch into active state.
- active = true;
+ self_->active = true;
// Get a command.
- errno_assert (rc == 0);
- bool ok = cpipe.read (cmd_);
+ bool ok = self_->cpipe.read (cmd_);
xs_assert (ok);
return 0;
}
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index e77364d..ea9dc08 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -22,32 +22,22 @@
#ifndef __XS_MAILBOX_HPP_INCLUDED__
#define __XS_MAILBOX_HPP_INCLUDED__
-#include <stddef.h>
-
-#include "platform.hpp"
#include "signaler.hpp"
-#include "fd.hpp"
#include "config.hpp"
#include "command.hpp"
#include "ypipe.hpp"
#include "mutex.hpp"
+#include "fd.hpp"
namespace xs
{
- class mailbox_t
- {
- public:
-
- mailbox_t ();
- ~mailbox_t ();
-
- fd_t get_fd ();
- void send (const command_t &cmd_);
- int recv (command_t *cmd_, int timeout_);
-
- private:
+ // Mailbox stores a list of commands sent to a particular object.
+ // Multiple threads can send commands to the mailbox in parallel.
+ // Only a single thread can read commands from the mailbox.
+ typedef struct
+ {
// The pipe to store actual commands.
typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t;
cpipe_t cpipe;
@@ -65,10 +55,13 @@ namespace xs
// read commands from it.
bool active;
- // Disable copying of mailbox_t object.
- mailbox_t (const mailbox_t&);
- const mailbox_t &operator = (const mailbox_t&);
- };
+ } mailbox_t;
+
+ int mailbox_init (mailbox_t *self_);
+ void mailbox_close (mailbox_t *self_);
+ fd_t mailbox_fd (mailbox_t *self_);
+ void mailbox_send (mailbox_t *self_, const command_t &cmd_);
+ int mailbox_recv (mailbox_t *self_, command_t *cmd_, int timeout_);
}
diff --git a/src/reaper.cpp b/src/reaper.cpp
index b610151..52b3e2d 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -27,16 +27,20 @@ xs::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
sockets (0),
terminating (false)
{
+ int rc = mailbox_init (&mailbox);
+ errno_assert (rc == 0);
+
io_thread = io_thread_t::create (ctx_, tid_);
xs_assert (io_thread);
- mailbox_handle = io_thread->add_fd (mailbox.get_fd (), this);
+ mailbox_handle = io_thread->add_fd (mailbox_fd (&mailbox), this);
io_thread->set_pollin (mailbox_handle);
}
xs::reaper_t::~reaper_t ()
{
delete io_thread;
+ mailbox_close (&mailbox);
}
xs::mailbox_t *xs::reaper_t::get_mailbox ()
@@ -61,7 +65,7 @@ void xs::reaper_t::in_event (fd_t fd_)
// Get the next command. If there is none, exit.
command_t cmd;
- int rc = mailbox.recv (&cmd, 0);
+ int rc = mailbox_recv (&mailbox, &cmd, 0);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
diff --git a/src/signaler.cpp b/src/signaler.cpp
index 78a13b2..af905a3 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -83,164 +83,19 @@
#include <ioctl.h>
#endif
-xs::signaler_t::signaler_t ()
-{
- // Create the socketpair for signaling.
- int rc = make_fdpair (&r, &w);
- errno_assert (rc == 0);
-
- // Set both fds to non-blocking mode.
- unblock_socket (w);
- unblock_socket (r);
-}
-
-xs::signaler_t::~signaler_t ()
-{
-#if defined XS_HAVE_EVENTFD
- int rc = close (r);
- errno_assert (rc == 0);
-#elif defined XS_HAVE_WINDOWS
- int rc = closesocket (w);
- wsa_assert (rc != SOCKET_ERROR);
- rc = closesocket (r);
- wsa_assert (rc != SOCKET_ERROR);
-#else
- int rc = close (w);
- errno_assert (rc == 0);
- rc = close (r);
- errno_assert (rc == 0);
-#endif
-}
-
-xs::fd_t xs::signaler_t::get_fd ()
-{
- return r;
-}
-
-void xs::signaler_t::send ()
-{
-#if defined XS_HAVE_EVENTFD
- const uint64_t inc = 1;
- ssize_t sz = write (w, &inc, sizeof (inc));
- errno_assert (sz == sizeof (inc));
-#elif defined XS_HAVE_WINDOWS
- unsigned char dummy = 0;
- int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
- wsa_assert (nbytes != SOCKET_ERROR);
- xs_assert (nbytes == sizeof (dummy));
-#else
- unsigned char dummy = 0;
- while (true) {
-#if defined MSG_NOSIGNAL
- ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), MSG_NOSIGNAL);
-#else
- ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
-#endif
- if (unlikely (nbytes == -1 && errno == EINTR))
- continue;
- xs_assert (nbytes == sizeof (dummy));
- break;
- }
-#endif
-}
-
-int xs::signaler_t::wait (int timeout_)
-{
-#ifdef XS_SIGNALER_WAIT_BASED_ON_POLL
-
- struct pollfd pfd;
- pfd.fd = r;
- pfd.events = POLLIN;
- int rc = poll (&pfd, 1, timeout_);
- if (unlikely (rc < 0)) {
- xs_assert (errno == EINTR);
- return -1;
- }
- else if (unlikely (rc == 0)) {
- errno = EAGAIN;
- return -1;
- }
- xs_assert (rc == 1);
- xs_assert (pfd.revents & POLLIN);
- return 0;
-
-#elif defined XS_SIGNALER_WAIT_BASED_ON_SELECT
-
- fd_set fds;
- FD_ZERO (&fds);
- FD_SET (r, &fds);
- struct timeval timeout;
- if (timeout_ >= 0) {
- timeout.tv_sec = timeout_ / 1000;
- timeout.tv_usec = timeout_ % 1000 * 1000;
- }
-#ifdef XS_HAVE_WINDOWS
- int rc = select (0, &fds, NULL, NULL,
- timeout_ >= 0 ? &timeout : NULL);
- wsa_assert (rc != SOCKET_ERROR);
-#else
- int rc = select (r + 1, &fds, NULL, NULL,
- timeout_ >= 0 ? &timeout : NULL);
- if (unlikely (rc < 0)) {
- xs_assert (errno == EINTR);
- return -1;
- }
-#endif
- if (unlikely (rc == 0)) {
- errno = EAGAIN;
- return -1;
- }
- xs_assert (rc == 1);
- return 0;
-
-#else
-#error
-#endif
-}
-
-void xs::signaler_t::recv ()
-{
- // Attempt to read a signal.
-#if defined XS_HAVE_EVENTFD
- uint64_t dummy;
- ssize_t sz = read (r, &dummy, sizeof (dummy));
- errno_assert (sz == sizeof (dummy));
-
- // If we accidentally grabbed the next signal along with the current
- // one, return it back to the eventfd object.
- if (unlikely (dummy == 2)) {
- const uint64_t inc = 1;
- ssize_t sz = write (w, &inc, sizeof (inc));
- errno_assert (sz == sizeof (inc));
- return;
- }
-
- xs_assert (dummy == 1);
-#else
- unsigned char dummy;
-#if defined XS_HAVE_WINDOWS
- int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
- wsa_assert (nbytes != SOCKET_ERROR);
-#else
- ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
- errno_assert (nbytes >= 0);
-#endif
- xs_assert (nbytes == sizeof (dummy));
- xs_assert (dummy == 0);
-#endif
-}
-
-int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
+static int make_fdpair (xs::fd_t *r_, xs::fd_t *w_)
{
#if defined XS_HAVE_EVENTFD
// Create eventfd object.
#if defined EFD_CLOEXEC
- fd_t fd = eventfd (0, EFD_CLOEXEC);
- errno_assert (fd != -1);
+ xs::fd_t fd = eventfd (0, EFD_CLOEXEC);
+ if (fd == -1)
+ return -1;
#else
- fd_t fd = eventfd (0, 0);
- errno_assert (fd != -1);
+ xs::fd_t fd = eventfd (0, 0);
+ if (fd == -1)
+ return -1;
#if defined FD_CLOEXEC
int rc = fcntl (fd, F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
@@ -292,13 +147,18 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Create listening socket.
SOCKET listener;
- listener = open_tcp_socket (AF_INET, false);
- wsa_assert (listener != INVALID_SOCKET);
+ listener = xs::open_socket (AF_INET, SOCK_STREAM, 0);
+ if (listener == xs::retired_fd)
+ return -1;
- // Set SO_REUSEADDR on the listening socket.
- BOOL reuseaddr = 1;
+ // Set SO_REUSEADDR and TCP_NODELAY on listening socket.
+ BOOL so_reuseaddr = 1;
int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
- (char*) &reuseaddr, sizeof (reuseaddr));
+ (char *)&so_reuseaddr, sizeof (so_reuseaddr));
+ wsa_assert (rc != SOCKET_ERROR);
+ BOOL tcp_nodelay = 1;
+ rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
+ (char *)&tcp_nodelay, sizeof (tcp_nodelay));
wsa_assert (rc != SOCKET_ERROR);
// Bind listening socket to the local port.
@@ -306,7 +166,7 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
memset (&addr, 0, sizeof (addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
- addr.sin_port = htons (signaler_port);
+ addr.sin_port = htons (xs::signaler_port);
rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR);
@@ -315,8 +175,17 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
wsa_assert (rc != SOCKET_ERROR);
// Create the writer socket.
- *w_ = open_tcp_socket (AF_INET, false);
- wsa_assert (*w_ != INVALID_SOCKET);
+ *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
+ if (*w_ == xs::retired_fd) {
+ rc = closesocket (listener);
+ wsa_assert (rc != SOCKET_ERROR);
+ return -1;
+ }
+
+ // Set TCP_NODELAY on writer socket.
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
+ (char *)&tcp_nodelay, sizeof (tcp_nodelay));
+ wsa_assert (rc != SOCKET_ERROR);
// Connect writer to the listener.
rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
@@ -324,8 +193,13 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Accept connection from writer.
*r_ = accept (listener, NULL, NULL);
- wsa_assert (*r_ != INVALID_SOCKET);
- tune_tcp_socket (*r_, false);
+ if (*r_ == xs::retired_fd) {
+ rc = closesocket (listener);
+ wsa_assert (rc != SOCKET_ERROR);
+ rc = closesocket (*w_);
+ wsa_assert (rc != SOCKET_ERROR);
+ return -1;
+ }
// We don't need the listening socket anymore. Close it.
rc = closesocket (listener);
@@ -351,14 +225,17 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
lcladdr.sin_port = 0;
- int listener = open_tcp_socket (AF_INET, false);
+ int listener = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (listener != -1);
int on = 1;
+ int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
+ errno_assert (rc != -1);
+
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
errno_assert (rc != -1);
- rc = bind (listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
+ rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
errno_assert (rc != -1);
socklen_t lcladdr_len = sizeof (lcladdr);
@@ -369,9 +246,12 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
rc = listen (listener, 1);
errno_assert (rc != -1);
- *w_ = open_tcp_socket (AF_INET, false);
+ *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (*w_ != -1);
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
+ errno_assert (rc != -1);
+
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
errno_assert (rc != -1);
@@ -380,7 +260,6 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
*r_ = accept (listener, NULL, NULL);
errno_assert (*r_ != -1);
- tune_tcp_socket (*r_);
close (listener);
@@ -391,9 +270,12 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
int sv [2];
#if defined XS_HAVE_SOCK_CLOEXEC
int rc = socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, sv);
- errno_assert (rc == 0);
+ if (rc == -1)
+ return -1;
#else
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
+ if (rc == -1)
+ return -1;
errno_assert (rc == 0);
#if defined FD_CLOEXEC
rc = fcntl (sv [0], F_SETFD, FD_CLOEXEC);
@@ -409,6 +291,156 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
#endif
}
+int xs::signaler_init (xs::signaler_t *self_)
+{
+ // Create the socketpair for signaling.
+ int rc = make_fdpair (&self_->r, &self_->w);
+ if (rc != 0)
+ return -1;
+
+ // Set both fds to non-blocking mode.
+ unblock_socket (self_->w);
+ unblock_socket (self_->r);
+ return 0;
+}
+
+void xs::signaler_close (xs::signaler_t *self_)
+{
+#if defined XS_HAVE_EVENTFD
+ int rc = close (self_->r);
+ errno_assert (rc == 0);
+#elif defined XS_HAVE_WINDOWS
+ int rc = closesocket (self_->w);
+ wsa_assert (rc != SOCKET_ERROR);
+ rc = closesocket (self_->r);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int rc = close (self_->w);
+ errno_assert (rc == 0);
+ rc = close (self_->r);
+ errno_assert (rc == 0);
+#endif
+}
+
+xs::fd_t xs::signaler_fd (xs::signaler_t *self_)
+{
+ return self_->r;
+}
+
+void xs::signaler_send (xs::signaler_t *self_)
+{
+#if defined XS_HAVE_EVENTFD
+ const uint64_t inc = 1;
+ ssize_t sz = write (self_->w, &inc, sizeof (inc));
+ errno_assert (sz == sizeof (inc));
+#elif defined XS_HAVE_WINDOWS
+ unsigned char dummy = 0;
+ int nbytes = ::send (self_->w, (char*) &dummy, sizeof (dummy), 0);
+ wsa_assert (nbytes != SOCKET_ERROR);
+ xs_assert (nbytes == sizeof (dummy));
+#else
+ unsigned char dummy = 0;
+ while (true) {
+#if defined MSG_NOSIGNAL
+ ssize_t nbytes = ::send (self_->w, &dummy, sizeof (dummy),
+ MSG_NOSIGNAL);
+#else
+ ssize_t nbytes = ::send (self_->w, &dummy, sizeof (dummy), 0);
+#endif
+ if (unlikely (nbytes == -1 && errno == EINTR))
+ continue;
+ xs_assert (nbytes == sizeof (dummy));
+ break;
+ }
+#endif
+}
+
+int xs::signaler_wait (xs::signaler_t *self_, int timeout_)
+{
+#ifdef XS_SIGNALER_WAIT_BASED_ON_POLL
+
+ struct pollfd pfd;
+ pfd.fd = self_->r;
+ pfd.events = POLLIN;
+ int rc = poll (&pfd, 1, timeout_);
+ if (unlikely (rc < 0)) {
+ xs_assert (errno == EINTR);
+ return -1;
+ }
+ else if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ xs_assert (rc == 1);
+ xs_assert (pfd.revents & POLLIN);
+ return 0;
+
+#elif defined XS_SIGNALER_WAIT_BASED_ON_SELECT
+
+ fd_set fds;
+ FD_ZERO (&fds);
+ FD_SET (self_->r, &fds);
+ struct timeval timeout;
+ if (timeout_ >= 0) {
+ timeout.tv_sec = timeout_ / 1000;
+ timeout.tv_usec = timeout_ % 1000 * 1000;
+ }
+#ifdef XS_HAVE_WINDOWS
+ int rc = select (0, &fds, NULL, NULL,
+ timeout_ >= 0 ? &timeout : NULL);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int rc = select (self_->r + 1, &fds, NULL, NULL,
+ timeout_ >= 0 ? &timeout : NULL);
+ if (unlikely (rc < 0)) {
+ xs_assert (errno == EINTR);
+ return -1;
+ }
+#endif
+ if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ xs_assert (rc == 1);
+ return 0;
+
+#else
+#error
+#endif
+}
+
+void xs::signaler_recv (xs::signaler_t *self_)
+{
+ // Attempt to read a signal.
+#if defined XS_HAVE_EVENTFD
+ uint64_t dummy;
+ ssize_t sz = read (self_->r, &dummy, sizeof (dummy));
+ errno_assert (sz == sizeof (dummy));
+
+ // If we accidentally grabbed the next signal along with the current
+ // one, return it back to the eventfd object.
+ if (unlikely (dummy == 2)) {
+ const uint64_t inc = 1;
+ ssize_t sz = write (self_->w, &inc, sizeof (inc));
+ errno_assert (sz == sizeof (inc));
+ return;
+ }
+
+ xs_assert (dummy == 1);
+#else
+ unsigned char dummy;
+#if defined XS_HAVE_WINDOWS
+ int nbytes = ::recv (self_->r, (char*) &dummy, sizeof (dummy), 0);
+ wsa_assert (nbytes != SOCKET_ERROR);
+#else
+ ssize_t nbytes = ::recv (self_->r, &dummy, sizeof (dummy), 0);
+ errno_assert (nbytes >= 0);
+#endif
+ xs_assert (nbytes == sizeof (dummy));
+ xs_assert (dummy == 0);
+#endif
+}
+
#if defined XS_SIGNALER_WAIT_BASED_ON_SELECT
#undef XS_SIGNALER_WAIT_BASED_ON_SELECT
#endif
diff --git a/src/signaler.hpp b/src/signaler.hpp
index c289490..20c2c36 100644
--- a/src/signaler.hpp
+++ b/src/signaler.hpp
@@ -31,32 +31,30 @@ namespace xs
// given moment. Attempt to send a signal before receiving the previous
// one will result in undefined behaviour.
- class signaler_t
- {
- public:
+ typedef struct {
+ fd_t w;
+ fd_t r;
+ } signaler_t;
- signaler_t ();
- ~signaler_t ();
+ // Initialise the signaler.
+ int signaler_init (signaler_t *self_);
- fd_t get_fd ();
- void send ();
- int wait (int timeout_);
- void recv ();
-
- private:
+ // Destroy the signaler.
+ void signaler_close (signaler_t *self_);
- // Creates a pair of filedescriptors that will be used
- // to pass the signals.
- static int make_fdpair (fd_t *r_, fd_t *w_);
+ // Return file decriptor that you can poll on to get notified when
+ // signal is sent.
+ fd_t signaler_fd (signaler_t *self_);
- // Underlying write & read file descriptor.
- fd_t w;
- fd_t r;
+ // Send a signal.
+ void signaler_send (signaler_t *self_);
+
+ // Wait for a signal. up to timout_ milliseconds.
+ // The signale is *not* consumed by this function.
+ int signaler_wait (signaler_t *self_, int timeout_);
- // Disable copying of signaler_t object.
- signaler_t (const signaler_t&);
- const signaler_t &operator = (const signaler_t&);
- };
+ // Wait for and consume a signal.
+ void signaler_recv (signaler_t *self_);
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 0e856ff..eb9b491 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -111,6 +111,9 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,
return NULL;
}
alloc_assert (s);
+ int rc = s->init ();
+ if (rc != 0)
+ return NULL;
return s;
}
@@ -119,6 +122,7 @@ xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
tag (0xbaddecaf),
ctx_terminated (false),
destroyed (false),
+ initialised (false),
last_tsc (0),
ticks (0),
rcvmore (false)
@@ -126,9 +130,25 @@ xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
options.socket_id = sid_;
}
+int xs::socket_base_t::init ()
+{
+ xs_assert (!initialised);
+ int rc = mailbox_init (&mailbox);
+ if (rc != 0) {
+ destroyed = true;
+ delete this;
+ return -1;
+ }
+ initialised = true;
+ return 0;
+}
+
xs::socket_base_t::~socket_base_t ()
{
xs_assert (destroyed);
+
+ if (initialised)
+ mailbox_close (&mailbox);
}
xs::mailbox_t *xs::socket_base_t::get_mailbox ()
@@ -146,7 +166,7 @@ void xs::socket_base_t::stop ()
}
int xs::socket_base_t::parse_uri (const char *uri_,
- std::string &protocol_, std::string &address_)
+ std::string &protocol_, std::string &address_)
{
xs_assert (uri_ != NULL);
@@ -265,7 +285,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- *((fd_t*) optval_) = mailbox.get_fd ();
+ *((fd_t*) optval_) = mailbox_fd (&mailbox);
*optvallen_ = sizeof (fd_t);
return 0;
}
@@ -666,7 +686,7 @@ void xs::socket_base_t::start_reaping (io_thread_t *io_thread_)
{
// Plug the socket to the reaper thread.
io_thread = io_thread_;
- handle = io_thread->add_fd (mailbox.get_fd (), this);
+ handle = io_thread->add_fd (mailbox_fd (&mailbox), this);
io_thread->set_pollin (handle);
// Initialise the termination and check whether it can be deallocated
@@ -682,7 +702,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_)
if (timeout_ != 0) {
// If we are asked to wait, simply ask mailbox to wait.
- rc = mailbox.recv (&cmd, timeout_);
+ rc = mailbox_recv (&mailbox, &cmd, timeout_);
}
else {
@@ -709,7 +729,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_)
}
// Check whether there are any commands pending for this thread.
- rc = mailbox.recv (&cmd, 0);
+ rc = mailbox_recv (&mailbox, &cmd, 0);
}
// Process all the commands available at the moment.
@@ -720,7 +740,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_)
return -1;
errno_assert (rc == 0);
cmd.destination->process_command (cmd);
- rc = mailbox.recv (&cmd, 0);
+ rc = mailbox_recv (&mailbox, &cmd, 0);
}
if (ctx_terminated) {
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 100b5c5..740fdde 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -130,6 +130,11 @@ namespace xs
private:
+ // Initialise the object. This function is separate from constructor
+ // so that it can return errors. If not successful, it deallocates
+ // the socket straight away.
+ int init ();
+
// To be called after processing commands or invoking any command
// handlers explicitly. If required, it will deallocate the socket.
void check_destroy ();
@@ -174,6 +179,7 @@ namespace xs
// Socket's mailbox object.
mailbox_t mailbox;
+ bool initialised;
// List of attached pipes.
typedef array_t <pipe_t, 3> pipes_t;
diff --git a/tests/shutdown_stress.cpp b/tests/shutdown_stress.cpp
index 27ab1f0..f106cc0 100644
--- a/tests/shutdown_stress.cpp
+++ b/tests/shutdown_stress.cpp
@@ -69,13 +69,18 @@ int XS_TEST_MAIN ()
for (i = 0; i != THREAD_COUNT; i++) {
s2 = xs_socket (ctx, XS_SUB);
- assert (s2);
- threads [i] = thread_create (shutdown_stress_worker, s2);
- assert (threads [i]);
+ if (!s2 && (errno == EMFILE || errno == ENFILE))
+ threads [i] = NULL;
+ else {
+ assert (s2);
+ threads [i] = thread_create (shutdown_stress_worker, s2);
+ assert (threads [i]);
+ }
}
for (i = 0; i != THREAD_COUNT; i++)
- thread_join (threads [i]);
+ if (threads [i])
+ thread_join (threads [i]);
rc = xs_close (s1);
assert (rc == 0);