diff options
| -rw-r--r-- | src/Makefile.am | 2 | ||||
| -rw-r--r-- | src/config.hpp | 3 | ||||
| -rw-r--r-- | src/mailbox.cpp | 441 | ||||
| -rw-r--r-- | src/mailbox.hpp | 29 | ||||
| -rw-r--r-- | src/signaler.cpp | 340 | ||||
| -rw-r--r-- | src/signaler.hpp | 63 | ||||
| -rw-r--r-- | tests/test_shutdown_stress.cpp | 2 | 
7 files changed, 458 insertions, 422 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 45e8ac0..9c2fbf1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -60,6 +60,7 @@ libzmq_la_SOURCES = \      select.hpp \      semaphore.hpp \      session.hpp \ +    signaler.hpp \      socket_base.hpp \      stdint.hpp \      sub.hpp \ @@ -123,6 +124,7 @@ libzmq_la_SOURCES = \      router.cpp \      select.cpp \      session.cpp \ +    signaler.cpp \      socket_base.cpp \      sub.cpp \      tcp_connecter.cpp \ diff --git a/src/config.hpp b/src/config.hpp index dff3f87..3984fcf 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -36,6 +36,9 @@ namespace zmq          //  memory allocation by approximately 99.6%          message_pipe_granularity = 256, +        //  Commands in pipe per allocation event. +        command_pipe_granularity = 16, +          //  Size in bytes of the largest message that is still copied around          //  rather than being reference-counted.          max_vsm_size = 29, diff --git a/src/mailbox.cpp b/src/mailbox.cpp index 7fdb93e..9ef3e19 100644 --- a/src/mailbox.cpp +++ b/src/mailbox.cpp @@ -18,439 +18,64 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#include "platform.hpp" - -#if defined ZMQ_FORCE_SELECT -#define ZMQ_RCVTIMEO_BASED_ON_SELECT -#elif defined ZMQ_FORCE_POLL -#define ZMQ_RCVTIMEO_BASED_ON_POLL -#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ -    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ -    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ -    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ -    defined ZMQ_HAVE_NETBSD -#define ZMQ_RCVTIMEO_BASED_ON_POLL -#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS -#define ZMQ_RCVTIMEO_BASED_ON_SELECT -#endif - -//  On AIX, poll.h has to be included before zmq.h to get consistent -//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' -//  instead of 'events' and 'revents' and defines macros to map from POSIX-y -//  names to AIX-specific names). -#if defined ZMQ_RCVTIMEO_BASED_ON_POLL -#include <poll.h> -#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#elif defined ZMQ_HAVE_HPUX -#include <sys/param.h> -#include <sys/types.h> -#include <sys/time.h> -#elif defined ZMQ_HAVE_OPENVMS -#include <sys/types.h> -#include <sys/time.h> -#else -#include <sys/select.h> -#endif -#endif -  #include "mailbox.hpp"  #include "err.hpp" -#include "fd.hpp" -#include "ip.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include <unistd.h> -#include <fcntl.h> -#include <limits.h> -#include <netinet/tcp.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#endif - -zmq::fd_t zmq::mailbox_t::get_fd () -{ -    return r; -} - -#if defined ZMQ_HAVE_WINDOWS -zmq::mailbox_t::mailbox_t () : -    blocking (true) +zmq::mailbox_t::mailbox_t ()  { -    //  Create the socketpair for signalling. -    int rc = make_socketpair (&r, &w); -    errno_assert (rc == 0); - -    //  Set the writer to non-blocking mode. -    unsigned long argp = 1; -    rc = ioctlsocket (w, FIONBIO, &argp); -    wsa_assert (rc != SOCKET_ERROR); +    //  Get the pipe into passive state. That way, if the users starts by +    //  polling on the associated file descriptor it will get woken up when +    //  new command is posted. +    bool ok = cpipe.read (NULL); +    zmq_assert (!ok); +    active = false;  }  zmq::mailbox_t::~mailbox_t ()  { -    int rc = closesocket (w); -    wsa_assert (rc != SOCKET_ERROR); - -    rc = closesocket (r); -    wsa_assert (rc != SOCKET_ERROR); -} - -void zmq::mailbox_t::send (const command_t &cmd_) -{ -    //  TODO: Implement SNDBUF auto-resizing as for POSIX platforms. -    //  In the mean time, the following code with assert if the send() -    //  call would block. -    int nbytes = ::send (w, (char *)&cmd_, sizeof (command_t), 0); -    wsa_assert (nbytes != SOCKET_ERROR); -    zmq_assert (nbytes == sizeof (command_t)); +    //  TODO: Retrieve and deallocate commands inside the cpipe.  } -int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) -{ -    //  If there's a finite timeout, poll on the fd. -    if (timeout_ > 0) -        return recv_timeout (cmd_, timeout_); - -    //  If required, switch the reader to blocking or non-blocking mode. -    if ((timeout_ < 0 && !blocking) || (timeout_ == 0 && blocking)) { -        blocking = (timeout_ < 0); -        unsigned long argp = blocking ? 0 : 1; -        int rc = ioctlsocket (r, FIONBIO, &argp); -        wsa_assert (rc != SOCKET_ERROR); -    } - -    //  Attempt to read an entire command. -    int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); -    if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { -        errno = EAGAIN; -        return -1; -    } - -    //  Sanity check for success. -    wsa_assert (nbytes != SOCKET_ERROR); - -    //  Check whether we haven't got half of command. -    zmq_assert (nbytes == sizeof (command_t)); - -    return 0; -} - -#else - -zmq::mailbox_t::mailbox_t () : -    blocking (true) -{ -#ifdef PIPE_BUF -    //  Make sure that command can be written to the socket in atomic fashion. -    //  If this wasn't guaranteed, commands from different threads would be -    //  interleaved. -    zmq_assert (sizeof (command_t) <= PIPE_BUF); -#endif - -    //  Create the socketpair for signaling. -    int rc = make_socketpair (&r, &w); -    errno_assert (rc == 0); - -    //  Set the writer to non-blocking mode. -    int flags = fcntl (w, F_GETFL, 0); -    errno_assert (flags >= 0); -    rc = fcntl (w, F_SETFL, flags | O_NONBLOCK); -    errno_assert (rc == 0); -} - -zmq::mailbox_t::~mailbox_t () +zmq::fd_t zmq::mailbox_t::get_fd ()  { -    close (w); -    close (r); +    return signaler.get_fd ();  }  void zmq::mailbox_t::send (const command_t &cmd_)  { -    //  Attempt to write an entire command without blocking. -    ssize_t nbytes; -    do { -        nbytes = ::send (w, &cmd_, sizeof (command_t), 0); -    } while (nbytes == -1 && errno == EINTR); - -    //  Attempt to increase mailbox SNDBUF if the send failed. -    if (nbytes == -1 && errno == EAGAIN) { -        int old_sndbuf, new_sndbuf; -        socklen_t sndbuf_size = sizeof old_sndbuf; - -        //  Retrieve current send buffer size. -        int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf, -            &sndbuf_size); -        errno_assert (rc == 0); -        new_sndbuf = old_sndbuf * 2; - -        //  Double the new send buffer size. -        rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size); -        errno_assert (rc == 0); - -        //  Verify that the OS actually honored the request. -        rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size); -        errno_assert (rc == 0); -        zmq_assert (new_sndbuf > old_sndbuf); - -        //  Retry the sending operation; at this point it must succeed. -        do { -            nbytes = ::send (w, &cmd_, sizeof (command_t), 0); -        } while (nbytes == -1 && errno == EINTR); -    } -    errno_assert (nbytes != -1); - -    //  This should never happen as we've already checked that command size is -    //  less than PIPE_BUF. -    zmq_assert (nbytes == sizeof (command_t)); +    sync.lock (); +    cpipe.write (cmd_, false); +    bool ok = cpipe.flush (); +    sync.unlock (); +    if (!ok) +        signaler.send ();  }  int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)  { -    //  If there's a finite timeout, poll on the fd. -    if (timeout_ > 0) -        return recv_timeout (cmd_, timeout_); - -#ifdef MSG_DONTWAIT - -    //  Attempt to read an entire command. Returns EAGAIN if non-blocking -    //  mode is requested and a command is not available. -    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), -        timeout_ < 0 ? 0 : MSG_DONTWAIT); -    if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) -        return -1; -#else - -    //  If required, switch the reader to blocking or non-blocking mode. -    if ((timeout_ < 0 && !blocking) || (timeout_ == 0 && blocking)) { -        blocking = (timeout_ < 0); -        int flags = fcntl (r, F_GETFL, 0); -        errno_assert (flags >= 0); -        int rc = fcntl (r, F_SETFL, -            blocking ? flags | O_NONBLOCK : flags & ~O_NONBLOCK); -        errno_assert (rc == 0); +    //  Try to get the command straight away. +    if (active) { +        bool ok = cpipe.read (cmd_); +        if (ok) +            return 0; + +        //  If there are no more commands available, switch into passive state. +        active = false; +        signaler.recv ();      } -    //  Attempt to read an entire command. -    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); -    if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))   +    //  Wait for signal from the command sender. +    int rc = signaler.wait (timeout_); +    if (rc != 0 && errno == EAGAIN)          return -1; -#endif - -    //  Sanity check for success. -    errno_assert (nbytes != -1); - -    //  Check whether we haven't got half of command. -    zmq_assert (nbytes == sizeof (command_t)); - -    return 0; -} - -#endif - -int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_) -{ -#if defined ZMQ_HAVE_WINDOWS - -    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe -    //  handles cannot be polled on. Here we create the socketpair by hand. -    *w_ = INVALID_SOCKET; -    *r_ = INVALID_SOCKET; - -    //  Create listening socket. -    SOCKET listener; -    listener = socket (AF_INET, SOCK_STREAM, 0); -    wsa_assert (listener != INVALID_SOCKET); - -    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket. -    BOOL so_reuseaddr = 1; -    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, -        (char *)&so_reuseaddr, sizeof (so_reuseaddr)); -    wsa_assert (rc != SOCKET_ERROR); -    BOOL tcp_nodelay = 1; -    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, -        (char *)&tcp_nodelay, sizeof (tcp_nodelay)); -    wsa_assert (rc != SOCKET_ERROR); - -    //  Bind listening socket to any free local port. -    struct sockaddr_in addr; -    memset (&addr, 0, sizeof (addr)); -    addr.sin_family = AF_INET; -    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); -    addr.sin_port = 0; -    rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); -    wsa_assert (rc != SOCKET_ERROR); - -    //  Retrieve local port listener is bound to (into addr). -    int addrlen = sizeof (addr); -    rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); -    wsa_assert (rc != SOCKET_ERROR); - -    //  Listen for incomming connections. -    rc = listen (listener, 1); -    wsa_assert (rc != SOCKET_ERROR); - -    //  Create the writer socket. -    *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0,  0); -    wsa_assert (*w_ != INVALID_SOCKET); - -    //  Set TCP_NODELAY on writer socket. -    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, -        (char *)&tcp_nodelay, sizeof (tcp_nodelay)); -    wsa_assert (rc != SOCKET_ERROR); - -    //  Connect writer to the listener. -    rc = connect (*w_, (sockaddr *) &addr, sizeof (addr)); -    wsa_assert (rc != SOCKET_ERROR); - -    //  Accept connection from writer. -    *r_ = accept (listener, NULL, NULL); -    wsa_assert (*r_ != INVALID_SOCKET); - -    //  We don't need the listening socket anymore. Close it. -    rc = closesocket (listener); -    wsa_assert (rc != SOCKET_ERROR); - -    return 0; - -#elif defined ZMQ_HAVE_OPENVMS - -    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further, -    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which -    //  can lead to performance problems. -    // -    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll -    //  create the socket pair manually. -    sockaddr_in lcladdr; -    memset (&lcladdr, 0, sizeof (lcladdr)); -    lcladdr.sin_family = AF_INET; -    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); -    lcladdr.sin_port = 0; - -    int listener = socket (AF_INET, SOCK_STREAM, 0); -    errno_assert (listener != -1); - -    int on = 1; -    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); -    errno_assert (rc != -1); - -    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); -    errno_assert (rc != -1); - -    rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); -    errno_assert (rc != -1); - -    socklen_t lcladdr_len = sizeof (lcladdr); - -    rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len); -    errno_assert (rc != -1); - -    rc = listen (listener, 1); -    errno_assert (rc != -1); - -    *w_ = socket (AF_INET, SOCK_STREAM, 0); -    errno_assert (*w_ != -1); - -    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); -    errno_assert (rc != -1); - -    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); -    errno_assert (rc != -1); +    //  We've got the signal. Now we can switch into active state. +    active = true; -    rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); -    errno_assert (rc != -1); - -    *r_ = accept (listener, NULL, NULL); -    errno_assert (*r_ != -1); - -    close (listener); - -    return 0; - -#else // All other implementations support socketpair() - -    int sv [2]; -    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); +    //  Get a command.      errno_assert (rc == 0); -    *w_ = sv [0]; -    *r_ = sv [1]; +    bool ok = cpipe.read (cmd_); +    zmq_assert (ok);      return 0; - -#endif  } -int zmq::mailbox_t::recv_timeout (command_t *cmd_, int timeout_) -{ -#ifdef ZMQ_RCVTIMEO_BASED_ON_POLL - -    struct pollfd pfd; -    pfd.fd = r; -    pfd.events = POLLIN; -    int rc = poll (&pfd, 1, timeout_); -    if (unlikely (rc < 0)) { -        zmq_assert (errno == EINTR); -        return -1; -    } -    else if (unlikely (rc == 0)) { -        errno = EAGAIN; -        return -1; -    } -    zmq_assert (rc == 1); -    zmq_assert (pfd.revents & POLLIN); - -#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT - -    fd_set fds; -    FD_ZERO (&fds); -    FD_SET (r, &fds); -    struct timeval timeout; -    timeout.tv_sec = timeout_ / 1000; -    timeout.tv_usec = timeout_ % 1000 * 1000; -#ifdef ZMQ_HAVE_WINDOWS -    int rc = select (0, &fds, NULL, NULL, &timeout); -    wsa_assert (rc != SOCKET_ERROR); -#else -    int rc = select (r + 1, &fds, NULL, NULL, &timeout); -    if (unlikely (rc < 0)) { -        zmq_assert (errno == EINTR); -        return -1; -    } -#endif -    if (unlikely (rc == 0)) { -        errno = EAGAIN; -        return -1; -    } -    zmq_assert (rc == 1); - - -#else -#error -#endif - -    //  The file descriptor is ready for reading. Extract one command out of it. -#ifdef ZMQ_HAVE_WINDOWS -    int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); -    wsa_assert (nbytes != SOCKET_ERROR); -#else -    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); -    if (unlikely (rc < 0 && errno == EINTR)) -        return -1; -    errno_assert (nbytes > 0); -#endif -    zmq_assert (nbytes == sizeof (command_t)); -    return 0; -} - -#if defined ZMQ_RCVTIMEO_BASED_ON_SELECT -#undef ZMQ_RCVTIMEO_BASED_ON_SELECT -#endif -#if defined ZMQ_RCVTIMEO_BASED_ON_POLL -#undef ZMQ_RCVTIMEO_BASED_ON_POLL -#endif - diff --git a/src/mailbox.hpp b/src/mailbox.hpp index eb02e39..0675b99 100644 --- a/src/mailbox.hpp +++ b/src/mailbox.hpp @@ -24,10 +24,12 @@  #include <stddef.h>  #include "platform.hpp" +#include "signaler.hpp"  #include "fd.hpp" -#include "stdint.hpp"  #include "config.hpp"  #include "command.hpp" +#include "ypipe.hpp" +#include "mutex.hpp"  namespace zmq  { @@ -45,21 +47,22 @@ namespace zmq      private: -        //  Platform-dependent function to create a socketpair. -        static int make_socketpair (fd_t *r_, fd_t *w_); +        //  The pipe to store actual commands. +        typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t; +        cpipe_t cpipe; -        //  Receives a command with the specific timeout. -        //  This function is not to be used for non-blocking or inifinitely -        //  blocking recvs. -        int recv_timeout (command_t *cmd_, int timeout_); +        //  Signaler to pass signals from writer thread to reader thread. +        signaler_t signaler; -        //  Write & read end of the socketpair. -        fd_t w; -        fd_t r; +        //  There's only one thread receiving from the mailbox, but there +        //  is arbitrary number of threads sending. Given that ypipe requires +        //  synchronised access on both of its endpoints, we have to synchronise +        //  the sending side. +        mutex_t sync; -        //  Used on platforms where there's no MSG_DONTWAIT functionality. -        //  True if the read socket is set to the blocking state. -        bool blocking; +        //  True if the underlying pipe is active, ie. when we are allowed to +        //  read commands from it. +        bool active;          //  Disable copying of mailbox_t object.          mailbox_t (const mailbox_t&); diff --git a/src/signaler.cpp b/src/signaler.cpp new file mode 100644 index 0000000..2ecfb98 --- /dev/null +++ b/src/signaler.cpp @@ -0,0 +1,340 @@ +/* +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#if defined ZMQ_FORCE_SELECT +#define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT +#elif defined ZMQ_FORCE_POLL +#define ZMQ_SIGNALER_WAIT_BASED_ON_POLL +#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ +    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ +    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ +    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ +    defined ZMQ_HAVE_NETBSD +#define ZMQ_SIGNALER_WAIT_BASED_ON_POLL +#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS +#define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT +#endif + +//  On AIX, poll.h has to be included before zmq.h to get consistent +//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' +//  instead of 'events' and 'revents' and defines macros to map from POSIX-y +//  names to AIX-specific names). +#if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL +#include <poll.h> +#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#elif defined ZMQ_HAVE_HPUX +#include <sys/param.h> +#include <sys/types.h> +#include <sys/time.h> +#elif defined ZMQ_HAVE_OPENVMS +#include <sys/types.h> +#include <sys/time.h> +#else +#include <sys/select.h> +#endif +#endif + +#include "signaler.hpp" +#include "likely.hpp" +#include "err.hpp" +#include "fd.hpp" +#include "ip.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <unistd.h> +#include <fcntl.h> +#include <limits.h> +#include <netinet/tcp.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#endif + +zmq::signaler_t::signaler_t () +{ +    //  Create the socketpair for signaling. +    int rc = make_fdpair (&r, &w); +    errno_assert (rc == 0); + +    //  Set both fds to non-blocking mode. +#if defined ZMQ_HAVE_WINDOWS +    unsigned long argp = 1; +    rc = ioctlsocket (w, FIONBIO, &argp); +    wsa_assert (rc != SOCKET_ERROR); +    rc = ioctlsocket (r, FIONBIO, &argp); +    wsa_assert (rc != SOCKET_ERROR); +#else +    int flags = fcntl (w, F_GETFL, 0); +    errno_assert (flags >= 0); +    rc = fcntl (w, F_SETFL, flags | O_NONBLOCK); +    errno_assert (rc == 0); +    flags = fcntl (r, F_GETFL, 0); +    errno_assert (flags >= 0); +    rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); +    errno_assert (rc == 0); +#endif +} + +zmq::signaler_t::~signaler_t () +{ +#if defined ZMQ_HAVE_WINDOWS +    int rc = closesocket (w); +    wsa_assert (rc != SOCKET_ERROR); +    rc = closesocket (r); +    wsa_assert (rc != SOCKET_ERROR); +#else +    close (w); +    close (r); +#endif +} + +zmq::fd_t zmq::signaler_t::get_fd () +{ +    return r; +} + +void zmq::signaler_t::send () +{ +#if defined ZMQ_HAVE_WINDOWS +    unsigned char dummy = 0; +    int nbytes = ::send (w, &dummy, sizeof (dummy), 0); +    wsa_assert (nbytes != SOCKET_ERROR); +    zmq_assert (nbytes == sizeof (dummy)); +#else +    unsigned char dummy = 0; +    while (true) { +        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0); +        if (unlikely (nbytes == -1 && errno == EINTR)) +            continue; +        zmq_assert (nbytes == sizeof (dummy)); +        break; +    } +#endif +} + +int zmq::signaler_t::wait (int timeout_) +{ +#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL + +    struct pollfd pfd; +    pfd.fd = r; +    pfd.events = POLLIN; +    int rc = poll (&pfd, 1, timeout_); +    if (unlikely (rc < 0)) { +        zmq_assert (errno == EINTR); +        return -1; +    } +    else if (unlikely (rc == 0)) { +        errno = EAGAIN; +        return -1; +    } +    zmq_assert (rc == 1); +    zmq_assert (pfd.revents & POLLIN); +    return 0; + +#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT + +    fd_set fds; +    FD_ZERO (&fds); +    FD_SET (r, &fds); +    struct timeval timeout; +    timeout.tv_sec = timeout_ / 1000; +    timeout.tv_usec = timeout_ % 1000 * 1000; +#ifdef ZMQ_HAVE_WINDOWS +    int rc = select (0, &fds, NULL, NULL, &timeout); +    wsa_assert (rc != SOCKET_ERROR); +#else +    int rc = select (r + 1, &fds, NULL, NULL, &timeout); +    if (unlikely (rc < 0)) { +        zmq_assert (errno == EINTR); +        return -1; +    } +#endif +    if (unlikely (rc == 0)) { +        errno = EAGAIN; +        return -1; +    } +    zmq_assert (rc == 1); +    return 0; + +#else +#error +#endif +} + +void zmq::signaler_t::recv () +{ +    //  Attempt to read a signal. +    unsigned char dummy; +#if ZMQ_HAVE_WINDOWS +    int nbytes = ::recv (r, &dummy, sizeof (dummy), 0); +    wsa_assert (nbytes != SOCKET_ERROR); +#else +    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0); +    errno_assert (nbytes >= 0); +#endif +    zmq_assert (nbytes == sizeof (dummy)); +    zmq_assert (dummy == 0); +} + +int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) +{ +#if defined ZMQ_HAVE_WINDOWS + +    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe +    //  handles cannot be polled on. Here we create the socketpair by hand. +    *w_ = INVALID_SOCKET; +    *r_ = INVALID_SOCKET; + +    //  Create listening socket. +    SOCKET listener; +    listener = socket (AF_INET, SOCK_STREAM, 0); +    wsa_assert (listener != INVALID_SOCKET); + +    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket. +    BOOL so_reuseaddr = 1; +    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, +        (char *)&so_reuseaddr, sizeof (so_reuseaddr)); +    wsa_assert (rc != SOCKET_ERROR); +    BOOL tcp_nodelay = 1; +    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, +        (char *)&tcp_nodelay, sizeof (tcp_nodelay)); +    wsa_assert (rc != SOCKET_ERROR); + +    //  Bind listening socket to any free local port. +    struct sockaddr_in addr; +    memset (&addr, 0, sizeof (addr)); +    addr.sin_family = AF_INET; +    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); +    addr.sin_port = 0; +    rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); +    wsa_assert (rc != SOCKET_ERROR); + +    //  Retrieve local port listener is bound to (into addr). +    int addrlen = sizeof (addr); +    rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); +    wsa_assert (rc != SOCKET_ERROR); + +    //  Listen for incomming connections. +    rc = listen (listener, 1); +    wsa_assert (rc != SOCKET_ERROR); + +    //  Create the writer socket. +    *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0,  0); +    wsa_assert (*w_ != INVALID_SOCKET); + +    //  Set TCP_NODELAY on writer socket. +    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, +        (char *)&tcp_nodelay, sizeof (tcp_nodelay)); +    wsa_assert (rc != SOCKET_ERROR); + +    //  Connect writer to the listener. +    rc = connect (*w_, (sockaddr *) &addr, sizeof (addr)); +    wsa_assert (rc != SOCKET_ERROR); + +    //  Accept connection from writer. +    *r_ = accept (listener, NULL, NULL); +    wsa_assert (*r_ != INVALID_SOCKET); + +    //  We don't need the listening socket anymore. Close it. +    rc = closesocket (listener); +    wsa_assert (rc != SOCKET_ERROR); + +    return 0; + +#elif defined ZMQ_HAVE_OPENVMS + +    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further, +    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which +    //  can lead to performance problems. +    // +    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll +    //  create the socket pair manually. +    sockaddr_in lcladdr; +    memset (&lcladdr, 0, sizeof (lcladdr)); +    lcladdr.sin_family = AF_INET; +    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); +    lcladdr.sin_port = 0; + +    int listener = socket (AF_INET, SOCK_STREAM, 0); +    errno_assert (listener != -1); + +    int on = 1; +    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); +    errno_assert (rc != -1); + +    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); +    errno_assert (rc != -1); + +    rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); +    errno_assert (rc != -1); + +    socklen_t lcladdr_len = sizeof (lcladdr); + +    rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len); +    errno_assert (rc != -1); + +    rc = listen (listener, 1); +    errno_assert (rc != -1); + +    *w_ = socket (AF_INET, SOCK_STREAM, 0); +    errno_assert (*w_ != -1); + +    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); +    errno_assert (rc != -1); + +    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); +    errno_assert (rc != -1); + +    rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); +    errno_assert (rc != -1); + +    *r_ = accept (listener, NULL, NULL); +    errno_assert (*r_ != -1); + +    close (listener); + +    return 0; + +#else // All other implementations support socketpair() + +    int sv [2]; +    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); +    errno_assert (rc == 0); +    *w_ = sv [0]; +    *r_ = sv [1]; +    return 0; + +#endif +} + +#if defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT +#undef ZMQ_SIGNALER_WAIT_BASED_ON_SELECT +#endif +#if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL +#undef ZMQ_SIGNALER_WAIT_BASED_ON_POLL +#endif + diff --git a/src/signaler.hpp b/src/signaler.hpp new file mode 100644 index 0000000..2ebff41 --- /dev/null +++ b/src/signaler.hpp @@ -0,0 +1,63 @@ +/* +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__ +#define __ZMQ_SIGNALER_HPP_INCLUDED__ + +#include "fd.hpp" + +namespace zmq +{ + +    //  This is a cross-platform equivalent to signal_fd. However, as opposed +    //  to signal_fd there can be at most one signal in the signaler at any +    //  given moment. Attempt to send a signal before receiving the previous +    //  one will result in undefined behaviour. + +    class signaler_t +    { +    public: + +        signaler_t (); +        ~signaler_t (); + +        fd_t get_fd (); +        void send (); +        int wait (int timeout_); +        void recv (); +         +    private: + +        //  Creates a pair of filedescriptors that will be used +        //  to pass the signals. +        static int make_fdpair (fd_t *r_, fd_t *w_); + +        //  Write & read end of the socketpair. +        fd_t w; +        fd_t r; + +        //  Disable copying of signaler_t object. +        signaler_t (const signaler_t&); +        const signaler_t &operator = (const signaler_t&); +    }; + +} + +#endif diff --git a/tests/test_shutdown_stress.cpp b/tests/test_shutdown_stress.cpp index ef81758..b3ee90f 100644 --- a/tests/test_shutdown_stress.cpp +++ b/tests/test_shutdown_stress.cpp @@ -23,7 +23,7 @@  #include <pthread.h>  #include <stddef.h> -#define THREAD_COUNT 10 +#define THREAD_COUNT 100  extern "C"  {  | 
