summaryrefslogtreecommitdiff
path: root/src/mailbox.cpp
diff options
context:
space:
mode:
authorFabien Ninoles <fabien@tzone.org>2011-06-17 12:22:02 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-06-17 12:22:02 +0200
commitd7923f08cab62ef40027a92f596ff45428870838 (patch)
tree370ad14bc9d1ebbc14f9d5f8077f81e28e301f5f /src/mailbox.cpp
parent65d2b70312efb148814b58d9cd38cc7069b53a3b (diff)
Add sockopt ZMQ_RCVTIMEO/ZMQ_SNDTIMEO.
- Add doc and tests - Add options and setup - Wait using poll/select Signed-off-by: Fabien Ninoles <fabien@tzone.org> Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/mailbox.cpp')
-rw-r--r--src/mailbox.cpp106
1 files changed, 98 insertions, 8 deletions
diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index 221396b..402d025 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -18,8 +18,33 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "mailbox.hpp"
#include "platform.hpp"
+
+#if defined ZMQ_FORCE_SELECT
+#define ZMQ_RCVTIMEO_BASED_ON_SELECT
+#elif defined ZMQ_FORCE_POLL
+#define ZMQ_RCVTIMEO_BASED_ON_POLL
+#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
+ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
+ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
+ defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
+ defined ZMQ_HAVE_NETBSD
+#define ZMQ_RCVTIMEO_BASED_ON_POLL
+#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+#define ZMQ_RCVTIMEO_BASED_ON_SELECT
+#endif
+
+// On AIX, poll.h has to be included before zmq.h to get consistent
+// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
+// instead of 'events' and 'revents' and defines macros to map from POSIX-y
+// names to AIX-specific names).
+#if defined ZMQ_RCVTIMEO_BASED_ON_POLL
+#include <poll.h>
+#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT
+#include <sys/select.h>
+#endif
+
+#include "mailbox.hpp"
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"
@@ -79,10 +104,14 @@ void zmq::mailbox_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{
+ // If there's a finite timeout, poll on the fd.
+ if (timeout_ > 0)
+ return recv_timeout (cmd_, timeout_);
+
// If required, set the reader to blocking mode.
- if (block_) {
+ if (timeout_ < 0) {
unsigned long argp = 0;
int rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
@@ -97,7 +126,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
err = EAGAIN;
// Re-set the reader to non-blocking mode.
- if (block_) {
+ if (timeout_ < 0) {
unsigned long argp = 1;
int rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
@@ -194,20 +223,24 @@ void zmq::mailbox_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{
+ // If there's a finite timeout, poll on the fd.
+ if (timeout_ > 0)
+ return recv_timeout (cmd_, timeout_);
+
#ifdef MSG_DONTWAIT
// Attempt to read an entire command. Returns EAGAIN if non-blocking
// mode is requested and a command is not available.
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
- block_ ? 0 : MSG_DONTWAIT);
+ timeout_ < 0 ? 0 : MSG_DONTWAIT);
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
return -1;
#else
// If required, set the reader to blocking mode.
- if (block_) {
+ if (timeout_ < 0) {
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
@@ -223,7 +256,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
err = errno;
// Re-set the reader to non-blocking mode.
- if (block_) {
+ if (timeout_ < 0) {
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
@@ -380,3 +413,60 @@ int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
#endif
}
+int zmq::mailbox_t::recv_timeout (command_t *cmd_, int timeout_)
+{
+#ifdef ZMQ_RCVTIMEO_BASED_ON_POLL
+
+ struct pollfd pfd;
+ pfd.fd = r;
+ pfd.events = POLLIN;
+ int rc = poll (&pfd, 1, timeout_);
+ if (unlikely (rc < 0)) {
+ zmq_assert (errno == EINTR);
+ return -1;
+ }
+ else if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+ zmq_assert (pfd.revents & POLLIN);
+
+#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT
+
+ fd_set fds;
+ FD_ZERO (&fds);
+ FD_SET (r, &fds);
+ struct timeval timeout;
+ timeout.tv_sec = timeout_ / 1000;
+ timeout.tv_usec = timeout_ % 1000 * 1000;
+ int rc = select (r + 1, &fds, NULL, NULL, &timeout);
+ if (unlikely (rc < 0)) {
+ zmq_assert (errno == EINTR);
+ return -1;
+ }
+ else if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+
+#else
+#error
+#endif
+
+ // The file descriptor is ready for reading. Extract one command out of it.
+ ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
+ if (unlikely (rc < 0 && errno == EINTR))
+ return -1;
+ zmq_assert (nbytes == sizeof (command_t));
+ return 0;
+}
+
+#if defined ZMQ_RCVTIMEO_BASED_ON_SELECT
+#undef ZMQ_RCVTIMEO_BASED_ON_SELECT
+#endif
+#if defined ZMQ_RCVTIMEO_BASED_ON_POLL
+#undef ZMQ_RCVTIMEO_BASED_ON_POLL
+#endif
+