summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-10-13 21:39:20 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-10-13 21:39:20 +0200
commite2167cecaefec6557c7a5712fb75e51487ff69a6 (patch)
tree5e4a836725a06d7d74b9ff97d82c839f7cf3147e
parent9d96e0037a9d027fd286f771fa2a8db5def485c8 (diff)
Precise timouts in zmq_poll implemented
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/zmq.cpp127
1 files changed, 90 insertions, 37 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp
index dad4367..9a15b6b 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -384,6 +384,10 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
return -1;
}
+ zmq::clock_t clock;
+ uint64_t now = 0;
+ uint64_t end = 0;
+
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
@@ -413,16 +417,21 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
bool first_pass = true;
int nevents = 0;
- if (timeout_ >= 0)
- timeout_ /= 1000;
- else
- timeout_ = -1;
while (true) {
+ // Compute the timeout for the subsequent poll.
+ int timeout;
+ if (first_pass)
+ timeout = 0;
+ else if (timeout_ < 0)
+ timeout = -1;
+ else
+ timeout = end - now;
+
// Wait for events.
while (true) {
- int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout_);
+ int rc = poll (pollfds, nitems_, timeout);
if (rc == -1 && errno == EINTR) {
free (pollfds);
return -1;
@@ -468,22 +477,33 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
nevents++;
}
- // If there are no events from the first pass (the one with no
- // timeout), do at least the second pass so that we wait.
- if (first_pass && nevents == 0 && timeout_ != 0) {
- first_pass = false;
- continue;
- }
+ // If timout is zero, exit immediately whether there are events or not.
+ if (timeout_ == 0)
+ break;
- // If timeout is set to infinite and we have to events to return
- // we can restart the polling.
- if (timeout_ == -1 && nevents == 0)
+ // If there are events to return, we can exit immediately.
+ if (nevents)
+ break;
+
+ // At this point we are meant to wait for events but there are none.
+ // If timeout is infinite we can just loop until we get some events.
+ if (timeout_ < 0)
continue;
- // TODO: if nevents is zero recompute timeout and loop
- // if it is not yet reached.
+ // The timeout is finite and there are no events. In the first pass
+ // we get a timestamp of when the polling have begun. (We assume that
+ // first pass have taken negligible time). We also compute the time
+ // when the polling should time out.
+ if (first_pass) {
+ now = clock.now_ms ();
+ end = now + (timeout_ / 1000);
+ continue;
+ }
- break;
+ // Find out whether timeout have expired.
+ now = clock.now_ms ();
+ if (now >= end)
+ break;
}
free (pollfds);
@@ -491,6 +511,19 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#elif defined ZMQ_POLL_BASED_ON_SELECT
+ if (!items_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ zmq::clock_t clock;
+ uint64_t now = 0;
+ uint64_t end = 0;
+
+ // Ensure we do not attempt to select () on more than FD_SETSIZE
+ // file descriptors.
+ zmq_assert (nitems_ <= FD_SETSIZE);
+
fd_set pollset_in;
FD_ZERO (&pollset_in);
fd_set pollset_out;
@@ -500,10 +533,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
zmq::fd_t maxfd = 0;
- // Ensure we do not attempt to select () on more than FD_SETSIZE
- // file descriptors.
- zmq_assert (nitems_ <= FD_SETSIZE);
-
// Build the fd_sets for passing to select ().
for (int i = 0; i != nitems_; i++) {
@@ -536,11 +565,25 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
}
bool first_pass = true;
- timeval zero_timeout = {0, 0};
- timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
int nevents = 0;
fd_set inset, outset, errset;
+ // Compute the timeout for the subsequent poll.
+ timeval timeout;
+ timeval *ptimeout;
+ if (first_pass) {
+ timeout.tv_sec = 0;
+ timeout.tv_usec = 0;
+ ptimeout = &timeout;
+ }
+ else if (timeout_ < 0)
+ ptimeout = NULL;
+ else {
+ timeout.tv_sec = (long) ((end - now) / 1000);
+ timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
+ ptimeout = &timeout;
+ }
+
while (true) {
// Wait for events. Ignore interrupts if there's infinite timeout.
@@ -548,8 +591,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
- int rc = select (maxfd + 1, &inset, &outset, &errset,
- first_pass ? &zero_timeout : (timeout_ < 0 ? NULL : &timeout));
+ int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
@@ -595,22 +637,33 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
nevents++;
}
- // If there are no events from the first pass (the one with no
- // timout), do at least the second pass so that we wait.
- if (first_pass && nevents == 0 && timeout_ != 0) {
- first_pass = false;
- continue;
- }
+ // If timout is zero, exit immediately whether there are events or not.
+ if (timeout_ == 0)
+ break;
- // If timeout is set to infinite and we have to events to return
- // we can restart the polling.
- if (timeout_ < 0 && nevents == 0)
+ // If there are events to return, we can exit immediately.
+ if (nevents)
+ break;
+
+ // At this point we are meant to wait for events but there are none.
+ // If timeout is infinite we can just loop until we get some events.
+ if (timeout_ < 0)
continue;
- // TODO: if nevents is zero recompute timeout and loop
- // if it is not yet reached.
+ // The timeout is finite and there are no events. In the first pass
+ // we get a timestamp of when the polling have begun. (We assume that
+ // first pass have taken negligible time). We also compute the time
+ // when the polling should time out.
+ if (first_pass) {
+ now = clock.now_ms ();
+ end = now + (timeout_ / 1000);
+ continue;
+ }
- break;
+ // Find out whether timeout have expired.
+ now = clock.now_ms ();
+ if (now >= end)
+ break;
}
return nevents;