diff options
-rw-r--r-- | src/atomic_ptr.hpp | 2 | ||||
-rw-r--r-- | src/ctx.cpp | 14 | ||||
-rw-r--r-- | src/io_thread.cpp | 7 | ||||
-rw-r--r-- | src/mailbox.cpp | 51 | ||||
-rw-r--r-- | src/mailbox.hpp | 33 | ||||
-rw-r--r-- | src/reaper.cpp | 8 | ||||
-rw-r--r-- | src/signaler.cpp | 366 | ||||
-rw-r--r-- | src/signaler.hpp | 40 | ||||
-rw-r--r-- | src/socket_base.cpp | 32 | ||||
-rw-r--r-- | src/socket_base.hpp | 6 | ||||
-rw-r--r-- | tests/shutdown_stress.cpp | 13 |
11 files changed, 325 insertions, 247 deletions
diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp index ac3f791..b197aa1 100644 --- a/src/atomic_ptr.hpp +++ b/src/atomic_ptr.hpp @@ -22,6 +22,8 @@ #ifndef __XS_ATOMIC_PTR_HPP_INCLUDED__ #define __XS_ATOMIC_PTR_HPP_INCLUDED__ +#include <stddef.h> + #include "platform.hpp" #if defined XS_FORCE_MUTEXES diff --git a/src/ctx.cpp b/src/ctx.cpp index 46fa984..8bd3506 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -47,9 +47,12 @@ xs::ctx_t::ctx_t () : max_sockets (512), io_thread_count (1) { + int rc = mailbox_init (&term_mailbox); + errno_assert (rc == 0); + // Plug in the standard plugins. - int rc = plug (prefix_filter); - xs_assert (rc == 0); + rc = plug (prefix_filter); + errno_assert (rc == 0); } bool xs::ctx_t::check_tag () @@ -81,6 +84,9 @@ xs::ctx_t::~ctx_t () if (slots) free (slots); + // Deallocate the termination mailbox. + mailbox_close (&term_mailbox); + // Remove the tag, so that the object is considered dead. tag = 0xdeadbeef; } @@ -112,7 +118,7 @@ int xs::ctx_t::terminate () // Wait till reaper thread closes all the sockets. command_t cmd; - int rc = term_mailbox.recv (&cmd, -1); + int rc = mailbox_recv (&term_mailbox, &cmd, -1); if (rc == -1 && errno == EINTR) return -1; xs_assert (rc == 0); @@ -297,7 +303,7 @@ xs_filter_t *xs::ctx_t::get_filter (int filter_id_) void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_) { - slots [tid_]->send (command_); + mailbox_send (slots [tid_], command_); } xs::io_thread_t *xs::ctx_t::choose_io_thread (uint64_t affinity_) diff --git a/src/io_thread.cpp b/src/io_thread.cpp index dba0f00..df5a623 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -48,15 +48,18 @@ xs::io_thread_t *xs::io_thread_t::create (xs::ctx_t *ctx_, uint32_t tid_) xs::io_thread_t::io_thread_t (xs::ctx_t *ctx_, uint32_t tid_) : object_t (ctx_, tid_) { + int rc = mailbox_init (&mailbox); + errno_assert (rc == 0); } xs::io_thread_t::~io_thread_t () { + mailbox_close (&mailbox); } void xs::io_thread_t::start () { - mailbox_handle = add_fd (mailbox.get_fd (), this); + mailbox_handle = add_fd (mailbox_fd (&mailbox), this); set_pollin (mailbox_handle); xstart (); } @@ -149,7 +152,7 @@ void xs::io_thread_t::in_event (fd_t fd_) // Get the next command. If there is none, exit. command_t cmd; - int rc = mailbox.recv (&cmd, 0); + int rc = mailbox_recv (&mailbox, &cmd, 0); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) diff --git a/src/mailbox.cpp b/src/mailbox.cpp index c977c5b..17d3dbf 100644 --- a/src/mailbox.cpp +++ b/src/mailbox.cpp @@ -22,60 +22,69 @@ #include "mailbox.hpp" #include "err.hpp" -xs::mailbox_t::mailbox_t () +int xs::mailbox_init (mailbox_t *self_) { + // Initlialise the signaler. + int rc = signaler_init (&self_->signaler); + if (rc != 0) + return -1; + // Get the pipe into passive state. That way, if the users starts by // polling on the associated file descriptor it will get woken up when // new command is posted. - bool ok = cpipe.read (NULL); + bool ok = self_->cpipe.read (NULL); xs_assert (!ok); - active = false; + self_->active = false; + return 0; } -xs::mailbox_t::~mailbox_t () +void xs::mailbox_close (mailbox_t *self_) { + // Deallocate the signaler. + signaler_close (&self_->signaler); + // TODO: Retrieve and deallocate commands inside the cpipe. } -xs::fd_t xs::mailbox_t::get_fd () +xs::fd_t xs::mailbox_fd (mailbox_t *self_) { - return signaler.get_fd (); + return signaler_fd (&self_->signaler); } -void xs::mailbox_t::send (const command_t &cmd_) +void xs::mailbox_send (mailbox_t *self_, const command_t &cmd_) { - sync.lock (); - cpipe.write (cmd_, false); - bool ok = cpipe.flush (); - sync.unlock (); + self_->sync.lock (); + self_->cpipe.write (cmd_, false); + bool ok = self_->cpipe.flush (); + self_->sync.unlock (); if (!ok) - signaler.send (); + signaler_send (&self_->signaler); } -int xs::mailbox_t::recv (command_t *cmd_, int timeout_) +int xs::mailbox_recv (mailbox_t *self_, command_t *cmd_, int timeout_) { // Try to get the command straight away. - if (active) { - bool ok = cpipe.read (cmd_); + if (self_->active) { + bool ok = self_->cpipe.read (cmd_); if (ok) return 0; // If there are no more commands available, switch into passive state. - active = false; - signaler.recv (); + self_->active = false; + signaler_recv (&self_->signaler); } // Wait for signal from the command sender. - int rc = signaler.wait (timeout_); + int rc = signaler_wait (&self_->signaler, timeout_); if (rc != 0 && (errno == EAGAIN || errno == EINTR)) return -1; + errno_assert (rc == 0); // We've got the signal. Now we can switch into active state. - active = true; + self_->active = true; // Get a command. - errno_assert (rc == 0); - bool ok = cpipe.read (cmd_); + bool ok = self_->cpipe.read (cmd_); xs_assert (ok); return 0; } diff --git a/src/mailbox.hpp b/src/mailbox.hpp index e77364d..ea9dc08 100644 --- a/src/mailbox.hpp +++ b/src/mailbox.hpp @@ -22,32 +22,22 @@ #ifndef __XS_MAILBOX_HPP_INCLUDED__ #define __XS_MAILBOX_HPP_INCLUDED__ -#include <stddef.h> - -#include "platform.hpp" #include "signaler.hpp" -#include "fd.hpp" #include "config.hpp" #include "command.hpp" #include "ypipe.hpp" #include "mutex.hpp" +#include "fd.hpp" namespace xs { - class mailbox_t - { - public: - - mailbox_t (); - ~mailbox_t (); - - fd_t get_fd (); - void send (const command_t &cmd_); - int recv (command_t *cmd_, int timeout_); - - private: + // Mailbox stores a list of commands sent to a particular object. + // Multiple threads can send commands to the mailbox in parallel. + // Only a single thread can read commands from the mailbox. + typedef struct + { // The pipe to store actual commands. typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t; cpipe_t cpipe; @@ -65,10 +55,13 @@ namespace xs // read commands from it. bool active; - // Disable copying of mailbox_t object. - mailbox_t (const mailbox_t&); - const mailbox_t &operator = (const mailbox_t&); - }; + } mailbox_t; + + int mailbox_init (mailbox_t *self_); + void mailbox_close (mailbox_t *self_); + fd_t mailbox_fd (mailbox_t *self_); + void mailbox_send (mailbox_t *self_, const command_t &cmd_); + int mailbox_recv (mailbox_t *self_, command_t *cmd_, int timeout_); } diff --git a/src/reaper.cpp b/src/reaper.cpp index b610151..52b3e2d 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -27,16 +27,20 @@ xs::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : sockets (0), terminating (false) { + int rc = mailbox_init (&mailbox); + errno_assert (rc == 0); + io_thread = io_thread_t::create (ctx_, tid_); xs_assert (io_thread); - mailbox_handle = io_thread->add_fd (mailbox.get_fd (), this); + mailbox_handle = io_thread->add_fd (mailbox_fd (&mailbox), this); io_thread->set_pollin (mailbox_handle); } xs::reaper_t::~reaper_t () { delete io_thread; + mailbox_close (&mailbox); } xs::mailbox_t *xs::reaper_t::get_mailbox () @@ -61,7 +65,7 @@ void xs::reaper_t::in_event (fd_t fd_) // Get the next command. If there is none, exit. command_t cmd; - int rc = mailbox.recv (&cmd, 0); + int rc = mailbox_recv (&mailbox, &cmd, 0); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) diff --git a/src/signaler.cpp b/src/signaler.cpp index 78a13b2..af905a3 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -83,164 +83,19 @@ #include <ioctl.h> #endif -xs::signaler_t::signaler_t () -{ - // Create the socketpair for signaling. - int rc = make_fdpair (&r, &w); - errno_assert (rc == 0); - - // Set both fds to non-blocking mode. - unblock_socket (w); - unblock_socket (r); -} - -xs::signaler_t::~signaler_t () -{ -#if defined XS_HAVE_EVENTFD - int rc = close (r); - errno_assert (rc == 0); -#elif defined XS_HAVE_WINDOWS - int rc = closesocket (w); - wsa_assert (rc != SOCKET_ERROR); - rc = closesocket (r); - wsa_assert (rc != SOCKET_ERROR); -#else - int rc = close (w); - errno_assert (rc == 0); - rc = close (r); - errno_assert (rc == 0); -#endif -} - -xs::fd_t xs::signaler_t::get_fd () -{ - return r; -} - -void xs::signaler_t::send () -{ -#if defined XS_HAVE_EVENTFD - const uint64_t inc = 1; - ssize_t sz = write (w, &inc, sizeof (inc)); - errno_assert (sz == sizeof (inc)); -#elif defined XS_HAVE_WINDOWS - unsigned char dummy = 0; - int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0); - wsa_assert (nbytes != SOCKET_ERROR); - xs_assert (nbytes == sizeof (dummy)); -#else - unsigned char dummy = 0; - while (true) { -#if defined MSG_NOSIGNAL - ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), MSG_NOSIGNAL); -#else - ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0); -#endif - if (unlikely (nbytes == -1 && errno == EINTR)) - continue; - xs_assert (nbytes == sizeof (dummy)); - break; - } -#endif -} - -int xs::signaler_t::wait (int timeout_) -{ -#ifdef XS_SIGNALER_WAIT_BASED_ON_POLL - - struct pollfd pfd; - pfd.fd = r; - pfd.events = POLLIN; - int rc = poll (&pfd, 1, timeout_); - if (unlikely (rc < 0)) { - xs_assert (errno == EINTR); - return -1; - } - else if (unlikely (rc == 0)) { - errno = EAGAIN; - return -1; - } - xs_assert (rc == 1); - xs_assert (pfd.revents & POLLIN); - return 0; - -#elif defined XS_SIGNALER_WAIT_BASED_ON_SELECT - - fd_set fds; - FD_ZERO (&fds); - FD_SET (r, &fds); - struct timeval timeout; - if (timeout_ >= 0) { - timeout.tv_sec = timeout_ / 1000; - timeout.tv_usec = timeout_ % 1000 * 1000; - } -#ifdef XS_HAVE_WINDOWS - int rc = select (0, &fds, NULL, NULL, - timeout_ >= 0 ? &timeout : NULL); - wsa_assert (rc != SOCKET_ERROR); -#else - int rc = select (r + 1, &fds, NULL, NULL, - timeout_ >= 0 ? &timeout : NULL); - if (unlikely (rc < 0)) { - xs_assert (errno == EINTR); - return -1; - } -#endif - if (unlikely (rc == 0)) { - errno = EAGAIN; - return -1; - } - xs_assert (rc == 1); - return 0; - -#else -#error -#endif -} - -void xs::signaler_t::recv () -{ - // Attempt to read a signal. -#if defined XS_HAVE_EVENTFD - uint64_t dummy; - ssize_t sz = read (r, &dummy, sizeof (dummy)); - errno_assert (sz == sizeof (dummy)); - - // If we accidentally grabbed the next signal along with the current - // one, return it back to the eventfd object. - if (unlikely (dummy == 2)) { - const uint64_t inc = 1; - ssize_t sz = write (w, &inc, sizeof (inc)); - errno_assert (sz == sizeof (inc)); - return; - } - - xs_assert (dummy == 1); -#else - unsigned char dummy; -#if defined XS_HAVE_WINDOWS - int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0); - wsa_assert (nbytes != SOCKET_ERROR); -#else - ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0); - errno_assert (nbytes >= 0); -#endif - xs_assert (nbytes == sizeof (dummy)); - xs_assert (dummy == 0); -#endif -} - -int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) +static int make_fdpair (xs::fd_t *r_, xs::fd_t *w_) { #if defined XS_HAVE_EVENTFD // Create eventfd object. #if defined EFD_CLOEXEC - fd_t fd = eventfd (0, EFD_CLOEXEC); - errno_assert (fd != -1); + xs::fd_t fd = eventfd (0, EFD_CLOEXEC); + if (fd == -1) + return -1; #else - fd_t fd = eventfd (0, 0); - errno_assert (fd != -1); + xs::fd_t fd = eventfd (0, 0); + if (fd == -1) + return -1; #if defined FD_CLOEXEC int rc = fcntl (fd, F_SETFD, FD_CLOEXEC); errno_assert (rc != -1); @@ -292,13 +147,18 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) // Create listening socket. SOCKET listener; - listener = open_tcp_socket (AF_INET, false); - wsa_assert (listener != INVALID_SOCKET); + listener = xs::open_socket (AF_INET, SOCK_STREAM, 0); + if (listener == xs::retired_fd) + return -1; - // Set SO_REUSEADDR on the listening socket. - BOOL reuseaddr = 1; + // Set SO_REUSEADDR and TCP_NODELAY on listening socket. + BOOL so_reuseaddr = 1; int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, - (char*) &reuseaddr, sizeof (reuseaddr)); + (char *)&so_reuseaddr, sizeof (so_reuseaddr)); + wsa_assert (rc != SOCKET_ERROR); + BOOL tcp_nodelay = 1; + rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, + (char *)&tcp_nodelay, sizeof (tcp_nodelay)); wsa_assert (rc != SOCKET_ERROR); // Bind listening socket to the local port. @@ -306,7 +166,7 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) memset (&addr, 0, sizeof (addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - addr.sin_port = htons (signaler_port); + addr.sin_port = htons (xs::signaler_port); rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); wsa_assert (rc != SOCKET_ERROR); @@ -315,8 +175,17 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) wsa_assert (rc != SOCKET_ERROR); // Create the writer socket. - *w_ = open_tcp_socket (AF_INET, false); - wsa_assert (*w_ != INVALID_SOCKET); + *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); + if (*w_ == xs::retired_fd) { + rc = closesocket (listener); + wsa_assert (rc != SOCKET_ERROR); + return -1; + } + + // Set TCP_NODELAY on writer socket. + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, + (char *)&tcp_nodelay, sizeof (tcp_nodelay)); + wsa_assert (rc != SOCKET_ERROR); // Connect writer to the listener. rc = connect (*w_, (sockaddr *) &addr, sizeof (addr)); @@ -324,8 +193,13 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) // Accept connection from writer. *r_ = accept (listener, NULL, NULL); - wsa_assert (*r_ != INVALID_SOCKET); - tune_tcp_socket (*r_, false); + if (*r_ == xs::retired_fd) { + rc = closesocket (listener); + wsa_assert (rc != SOCKET_ERROR); + rc = closesocket (*w_); + wsa_assert (rc != SOCKET_ERROR); + return -1; + } // We don't need the listening socket anymore. Close it. rc = closesocket (listener); @@ -351,14 +225,17 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); lcladdr.sin_port = 0; - int listener = open_tcp_socket (AF_INET, false); + int listener = open_socket (AF_INET, SOCK_STREAM, 0); errno_assert (listener != -1); int on = 1; + int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); + errno_assert (rc != -1); + rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); errno_assert (rc != -1); - rc = bind (listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); + rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); errno_assert (rc != -1); socklen_t lcladdr_len = sizeof (lcladdr); @@ -369,9 +246,12 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) rc = listen (listener, 1); errno_assert (rc != -1); - *w_ = open_tcp_socket (AF_INET, false); + *w_ = open_socket (AF_INET, SOCK_STREAM, 0); errno_assert (*w_ != -1); + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); + errno_assert (rc != -1); + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); errno_assert (rc != -1); @@ -380,7 +260,6 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) *r_ = accept (listener, NULL, NULL); errno_assert (*r_ != -1); - tune_tcp_socket (*r_); close (listener); @@ -391,9 +270,12 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) int sv [2]; #if defined XS_HAVE_SOCK_CLOEXEC int rc = socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, sv); - errno_assert (rc == 0); + if (rc == -1) + return -1; #else int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); + if (rc == -1) + return -1; errno_assert (rc == 0); #if defined FD_CLOEXEC rc = fcntl (sv [0], F_SETFD, FD_CLOEXEC); @@ -409,6 +291,156 @@ int xs::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) #endif } +int xs::signaler_init (xs::signaler_t *self_) +{ + // Create the socketpair for signaling. + int rc = make_fdpair (&self_->r, &self_->w); + if (rc != 0) + return -1; + + // Set both fds to non-blocking mode. + unblock_socket (self_->w); + unblock_socket (self_->r); + return 0; +} + +void xs::signaler_close (xs::signaler_t *self_) +{ +#if defined XS_HAVE_EVENTFD + int rc = close (self_->r); + errno_assert (rc == 0); +#elif defined XS_HAVE_WINDOWS + int rc = closesocket (self_->w); + wsa_assert (rc != SOCKET_ERROR); + rc = closesocket (self_->r); + wsa_assert (rc != SOCKET_ERROR); +#else + int rc = close (self_->w); + errno_assert (rc == 0); + rc = close (self_->r); + errno_assert (rc == 0); +#endif +} + +xs::fd_t xs::signaler_fd (xs::signaler_t *self_) +{ + return self_->r; +} + +void xs::signaler_send (xs::signaler_t *self_) +{ +#if defined XS_HAVE_EVENTFD + const uint64_t inc = 1; + ssize_t sz = write (self_->w, &inc, sizeof (inc)); + errno_assert (sz == sizeof (inc)); +#elif defined XS_HAVE_WINDOWS + unsigned char dummy = 0; + int nbytes = ::send (self_->w, (char*) &dummy, sizeof (dummy), 0); + wsa_assert (nbytes != SOCKET_ERROR); + xs_assert (nbytes == sizeof (dummy)); +#else + unsigned char dummy = 0; + while (true) { +#if defined MSG_NOSIGNAL + ssize_t nbytes = ::send (self_->w, &dummy, sizeof (dummy), + MSG_NOSIGNAL); +#else + ssize_t nbytes = ::send (self_->w, &dummy, sizeof (dummy), 0); +#endif + if (unlikely (nbytes == -1 && errno == EINTR)) + continue; + xs_assert (nbytes == sizeof (dummy)); + break; + } +#endif +} + +int xs::signaler_wait (xs::signaler_t *self_, int timeout_) +{ +#ifdef XS_SIGNALER_WAIT_BASED_ON_POLL + + struct pollfd pfd; + pfd.fd = self_->r; + pfd.events = POLLIN; + int rc = poll (&pfd, 1, timeout_); + if (unlikely (rc < 0)) { + xs_assert (errno == EINTR); + return -1; + } + else if (unlikely (rc == 0)) { + errno = EAGAIN; + return -1; + } + xs_assert (rc == 1); + xs_assert (pfd.revents & POLLIN); + return 0; + +#elif defined XS_SIGNALER_WAIT_BASED_ON_SELECT + + fd_set fds; + FD_ZERO (&fds); + FD_SET (self_->r, &fds); + struct timeval timeout; + if (timeout_ >= 0) { + timeout.tv_sec = timeout_ / 1000; + timeout.tv_usec = timeout_ % 1000 * 1000; + } +#ifdef XS_HAVE_WINDOWS + int rc = select (0, &fds, NULL, NULL, + timeout_ >= 0 ? &timeout : NULL); + wsa_assert (rc != SOCKET_ERROR); +#else + int rc = select (self_->r + 1, &fds, NULL, NULL, + timeout_ >= 0 ? &timeout : NULL); + if (unlikely (rc < 0)) { + xs_assert (errno == EINTR); + return -1; + } +#endif + if (unlikely (rc == 0)) { + errno = EAGAIN; + return -1; + } + xs_assert (rc == 1); + return 0; + +#else +#error +#endif +} + +void xs::signaler_recv (xs::signaler_t *self_) +{ + // Attempt to read a signal. +#if defined XS_HAVE_EVENTFD + uint64_t dummy; + ssize_t sz = read (self_->r, &dummy, sizeof (dummy)); + errno_assert (sz == sizeof (dummy)); + + // If we accidentally grabbed the next signal along with the current + // one, return it back to the eventfd object. + if (unlikely (dummy == 2)) { + const uint64_t inc = 1; + ssize_t sz = write (self_->w, &inc, sizeof (inc)); + errno_assert (sz == sizeof (inc)); + return; + } + + xs_assert (dummy == 1); +#else + unsigned char dummy; +#if defined XS_HAVE_WINDOWS + int nbytes = ::recv (self_->r, (char*) &dummy, sizeof (dummy), 0); + wsa_assert (nbytes != SOCKET_ERROR); +#else + ssize_t nbytes = ::recv (self_->r, &dummy, sizeof (dummy), 0); + errno_assert (nbytes >= 0); +#endif + xs_assert (nbytes == sizeof (dummy)); + xs_assert (dummy == 0); +#endif +} + #if defined XS_SIGNALER_WAIT_BASED_ON_SELECT #undef XS_SIGNALER_WAIT_BASED_ON_SELECT #endif diff --git a/src/signaler.hpp b/src/signaler.hpp index c289490..20c2c36 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -31,32 +31,30 @@ namespace xs // given moment. Attempt to send a signal before receiving the previous // one will result in undefined behaviour. - class signaler_t - { - public: + typedef struct { + fd_t w; + fd_t r; + } signaler_t; - signaler_t (); - ~signaler_t (); + // Initialise the signaler. + int signaler_init (signaler_t *self_); - fd_t get_fd (); - void send (); - int wait (int timeout_); - void recv (); - - private: + // Destroy the signaler. + void signaler_close (signaler_t *self_); - // Creates a pair of filedescriptors that will be used - // to pass the signals. - static int make_fdpair (fd_t *r_, fd_t *w_); + // Return file decriptor that you can poll on to get notified when + // signal is sent. + fd_t signaler_fd (signaler_t *self_); - // Underlying write & read file descriptor. - fd_t w; - fd_t r; + // Send a signal. + void signaler_send (signaler_t *self_); + + // Wait for a signal. up to timout_ milliseconds. + // The signale is *not* consumed by this function. + int signaler_wait (signaler_t *self_, int timeout_); - // Disable copying of signaler_t object. - signaler_t (const signaler_t&); - const signaler_t &operator = (const signaler_t&); - }; + // Wait for and consume a signal. + void signaler_recv (signaler_t *self_); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 0e856ff..eb9b491 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -111,6 +111,9 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_, return NULL; } alloc_assert (s); + int rc = s->init (); + if (rc != 0) + return NULL; return s; } @@ -119,6 +122,7 @@ xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : tag (0xbaddecaf), ctx_terminated (false), destroyed (false), + initialised (false), last_tsc (0), ticks (0), rcvmore (false) @@ -126,9 +130,25 @@ xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : options.socket_id = sid_; } +int xs::socket_base_t::init () +{ + xs_assert (!initialised); + int rc = mailbox_init (&mailbox); + if (rc != 0) { + destroyed = true; + delete this; + return -1; + } + initialised = true; + return 0; +} + xs::socket_base_t::~socket_base_t () { xs_assert (destroyed); + + if (initialised) + mailbox_close (&mailbox); } xs::mailbox_t *xs::socket_base_t::get_mailbox () @@ -146,7 +166,7 @@ void xs::socket_base_t::stop () } int xs::socket_base_t::parse_uri (const char *uri_, - std::string &protocol_, std::string &address_) + std::string &protocol_, std::string &address_) { xs_assert (uri_ != NULL); @@ -265,7 +285,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - *((fd_t*) optval_) = mailbox.get_fd (); + *((fd_t*) optval_) = mailbox_fd (&mailbox); *optvallen_ = sizeof (fd_t); return 0; } @@ -666,7 +686,7 @@ void xs::socket_base_t::start_reaping (io_thread_t *io_thread_) { // Plug the socket to the reaper thread. io_thread = io_thread_; - handle = io_thread->add_fd (mailbox.get_fd (), this); + handle = io_thread->add_fd (mailbox_fd (&mailbox), this); io_thread->set_pollin (handle); // Initialise the termination and check whether it can be deallocated @@ -682,7 +702,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_) if (timeout_ != 0) { // If we are asked to wait, simply ask mailbox to wait. - rc = mailbox.recv (&cmd, timeout_); + rc = mailbox_recv (&mailbox, &cmd, timeout_); } else { @@ -709,7 +729,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = mailbox.recv (&cmd, 0); + rc = mailbox_recv (&mailbox, &cmd, 0); } // Process all the commands available at the moment. @@ -720,7 +740,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_) return -1; errno_assert (rc == 0); cmd.destination->process_command (cmd); - rc = mailbox.recv (&cmd, 0); + rc = mailbox_recv (&mailbox, &cmd, 0); } if (ctx_terminated) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 100b5c5..740fdde 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -130,6 +130,11 @@ namespace xs private: + // Initialise the object. This function is separate from constructor + // so that it can return errors. If not successful, it deallocates + // the socket straight away. + int init (); + // To be called after processing commands or invoking any command // handlers explicitly. If required, it will deallocate the socket. void check_destroy (); @@ -174,6 +179,7 @@ namespace xs // Socket's mailbox object. mailbox_t mailbox; + bool initialised; // List of attached pipes. typedef array_t <pipe_t, 3> pipes_t; diff --git a/tests/shutdown_stress.cpp b/tests/shutdown_stress.cpp index 27ab1f0..f106cc0 100644 --- a/tests/shutdown_stress.cpp +++ b/tests/shutdown_stress.cpp @@ -69,13 +69,18 @@ int XS_TEST_MAIN () for (i = 0; i != THREAD_COUNT; i++) { s2 = xs_socket (ctx, XS_SUB); - assert (s2); - threads [i] = thread_create (shutdown_stress_worker, s2); - assert (threads [i]); + if (!s2 && (errno == EMFILE || errno == ENFILE)) + threads [i] = NULL; + else { + assert (s2); + threads [i] = thread_create (shutdown_stress_worker, s2); + assert (threads [i]); + } } for (i = 0; i != THREAD_COUNT; i++) - thread_join (threads [i]); + if (threads [i]) + thread_join (threads [i]); rc = xs_close (s1); assert (rc == 0); |