summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2010-01-04 15:46:20 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2010-01-04 15:46:20 +0100
commitf2b235db165e459f7f265993477dae0dc987125a (patch)
tree4c51a044bb8a66d4787297540d78d88434632732
parent7884f4541aa6f95b76e0b2429baaf11108c543a1 (diff)
ZMQII-29: Add timeout to zmq_poll function
-rw-r--r--bindings/c/zmq.h2
-rw-r--r--bindings/cpp/zmq.hpp4
-rw-r--r--man/man3/zmq_poll.310
-rw-r--r--src/zmq.cpp23
4 files changed, 29 insertions, 10 deletions
diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h
index 7d3544e..7ad70e7 100644
--- a/bindings/c/zmq.h
+++ b/bindings/c/zmq.h
@@ -202,7 +202,7 @@ typedef struct
short revents;
} zmq_pollitem_t;
-ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems);
+ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
////////////////////////////////////////////////////////////////////////////////
// Helper functions.
diff --git a/bindings/cpp/zmq.hpp b/bindings/cpp/zmq.hpp
index 4349f0b..6d5bbab 100644
--- a/bindings/cpp/zmq.hpp
+++ b/bindings/cpp/zmq.hpp
@@ -33,9 +33,9 @@ namespace zmq
typedef zmq_free_fn free_fn;
typedef zmq_pollitem_t pollitem_t;
- inline int poll (zmq_pollitem_t *items_, int nitems_)
+ inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1)
{
- return zmq_poll (items_, nitems_);
+ return zmq_poll (items_, nitems_, timeout_);
}
class error_t : public std::exception
diff --git a/man/man3/zmq_poll.3 b/man/man3/zmq_poll.3
index 9782691..82277e3 100644
--- a/man/man3/zmq_poll.3
+++ b/man/man3/zmq_poll.3
@@ -2,7 +2,7 @@
.SH NAME
zmq_poll \- polls for events on a set of 0MQ and POSIX sockets
.SH SYNOPSIS
-.B int zmq_poll (zmq_pollitem_t *items, int nitems);
+.B int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
.SH DESCRIPTION
Waits for the events specified by
.IR items
@@ -31,7 +31,7 @@ specifies which events to wait for. It's a combination of the values below.
Once the call exits,
.IR revent
will be filled with events that have actually occured on the socket. The field
-will contain a combination of the following values.
+will contain a combination of the values below.
.IP "\fBZMQ_POLLIN\fP"
poll for incoming messages.
@@ -40,6 +40,12 @@ wait while message can be set socket. Poll will return if a message of at least
one byte can be written to the socket. However, there is no guarantee that
arbitrarily large message can be sent.
+.IR timeout
+argument specifies an upper limit on the time for which
+.IR zmq_poll
+will block, in microseconds. Specifying a negative value in timeout means
+an infinite timeout.
+
.SH RETURN VALUE
Function returns number of items signaled or -1 in the case of error.
.SH ERRORS
diff --git a/src/zmq.cpp b/src/zmq.cpp
index cce07af..581a456 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -264,7 +264,7 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}
-int zmq_poll (zmq_pollitem_t *items_, int nitems_)
+int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
@@ -321,6 +321,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
npollfds++;
}
+ int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
int nevents = 0;
bool initial = true;
while (!nevents) {
@@ -328,10 +329,16 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
// Wait for activity. In the first iteration just check for events,
// don't wait. Waiting would prevent exiting on any events that may
// already be signaled on 0MQ sockets.
- int rc = poll (pollfds, npollfds, initial ? 0 : -1);
+ int rc = poll (pollfds, npollfds, initial ? 0 : timeout);
if (rc == -1 && errno == EINTR)
continue;
errno_assert (rc >= 0);
+
+ // If timeout was hit with no events signaled, return zero.
+ if (!initial && rc == 0)
+ return 0;
+
+ // From now on, perform blocking polling.
initial = false;
// Process 0MQ commands if needed.
@@ -426,6 +433,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
maxfd = notify_fd;
}
+ timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
+ timeval zero_timeout = {0, 0};
int nevents = 0;
bool initial = true;
while (!nevents) {
@@ -433,17 +442,21 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
// Wait for activity. In the first iteration just check for events,
// don't wait. Waiting would prevent exiting on any events that may
// already be signaled on 0MQ sockets.
- timeval timeout = {0, 0};
int rc = select (maxfd, &pollset_in, &pollset_out, &pollset_err,
- initial ? &timeout : NULL);
+ initial ? &zero_timeout : &timeout);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
if (rc == -1 && errno == EINTR)
continue;
#endif
-
errno_assert (rc >= 0);
+
+ // If timeout was hit with no events signaled, return zero.
+ if (!initial && rc == 0)
+ return 0;
+
+ // From now on, perform blocking select.
initial = false;
// Process 0MQ commands if needed.