diff options
Diffstat (limited to 'src/mailbox.cpp')
-rw-r--r-- | src/mailbox.cpp | 106 |
1 files changed, 98 insertions, 8 deletions
diff --git a/src/mailbox.cpp b/src/mailbox.cpp index 221396b..402d025 100644 --- a/src/mailbox.cpp +++ b/src/mailbox.cpp @@ -18,8 +18,33 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "mailbox.hpp" #include "platform.hpp" + +#if defined ZMQ_FORCE_SELECT +#define ZMQ_RCVTIMEO_BASED_ON_SELECT +#elif defined ZMQ_FORCE_POLL +#define ZMQ_RCVTIMEO_BASED_ON_POLL +#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ + defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ + defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ + defined ZMQ_HAVE_NETBSD +#define ZMQ_RCVTIMEO_BASED_ON_POLL +#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS +#define ZMQ_RCVTIMEO_BASED_ON_SELECT +#endif + +// On AIX, poll.h has to be included before zmq.h to get consistent +// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' +// instead of 'events' and 'revents' and defines macros to map from POSIX-y +// names to AIX-specific names). +#if defined ZMQ_RCVTIMEO_BASED_ON_POLL +#include <poll.h> +#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT +#include <sys/select.h> +#endif + +#include "mailbox.hpp" #include "err.hpp" #include "fd.hpp" #include "ip.hpp" @@ -79,10 +104,14 @@ void zmq::mailbox_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::mailbox_t::recv (command_t *cmd_, bool block_) +int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) { + // If there's a finite timeout, poll on the fd. + if (timeout_ > 0) + return recv_timeout (cmd_, timeout_); + // If required, set the reader to blocking mode. - if (block_) { + if (timeout_ < 0) { unsigned long argp = 0; int rc = ioctlsocket (r, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); @@ -97,7 +126,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_) err = EAGAIN; // Re-set the reader to non-blocking mode. - if (block_) { + if (timeout_ < 0) { unsigned long argp = 1; int rc = ioctlsocket (r, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); @@ -194,20 +223,24 @@ void zmq::mailbox_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::mailbox_t::recv (command_t *cmd_, bool block_) +int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) { + // If there's a finite timeout, poll on the fd. + if (timeout_ > 0) + return recv_timeout (cmd_, timeout_); + #ifdef MSG_DONTWAIT // Attempt to read an entire command. Returns EAGAIN if non-blocking // mode is requested and a command is not available. ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), - block_ ? 0 : MSG_DONTWAIT); + timeout_ < 0 ? 0 : MSG_DONTWAIT); if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) return -1; #else // If required, set the reader to blocking mode. - if (block_) { + if (timeout_ < 0) { int flags = fcntl (r, F_GETFL, 0); errno_assert (flags >= 0); int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); @@ -223,7 +256,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_) err = errno; // Re-set the reader to non-blocking mode. - if (block_) { + if (timeout_ < 0) { int flags = fcntl (r, F_GETFL, 0); errno_assert (flags >= 0); int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); @@ -380,3 +413,60 @@ int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_) #endif } +int zmq::mailbox_t::recv_timeout (command_t *cmd_, int timeout_) +{ +#ifdef ZMQ_RCVTIMEO_BASED_ON_POLL + + struct pollfd pfd; + pfd.fd = r; + pfd.events = POLLIN; + int rc = poll (&pfd, 1, timeout_); + if (unlikely (rc < 0)) { + zmq_assert (errno == EINTR); + return -1; + } + else if (unlikely (rc == 0)) { + errno = EAGAIN; + return -1; + } + zmq_assert (rc == 1); + zmq_assert (pfd.revents & POLLIN); + +#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT + + fd_set fds; + FD_ZERO (&fds); + FD_SET (r, &fds); + struct timeval timeout; + timeout.tv_sec = timeout_ / 1000; + timeout.tv_usec = timeout_ % 1000 * 1000; + int rc = select (r + 1, &fds, NULL, NULL, &timeout); + if (unlikely (rc < 0)) { + zmq_assert (errno == EINTR); + return -1; + } + else if (unlikely (rc == 0)) { + errno = EAGAIN; + return -1; + } + zmq_assert (rc == 1); + +#else +#error +#endif + + // The file descriptor is ready for reading. Extract one command out of it. + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); + if (unlikely (rc < 0 && errno == EINTR)) + return -1; + zmq_assert (nbytes == sizeof (command_t)); + return 0; +} + +#if defined ZMQ_RCVTIMEO_BASED_ON_SELECT +#undef ZMQ_RCVTIMEO_BASED_ON_SELECT +#endif +#if defined ZMQ_RCVTIMEO_BASED_ON_POLL +#undef ZMQ_RCVTIMEO_BASED_ON_POLL +#endif + |