diff options
-rw-r--r-- | src/zmq.cpp | 127 |
1 files changed, 90 insertions, 37 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp index dad4367..9a15b6b 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -384,6 +384,10 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) return -1; } + zmq::clock_t clock; + uint64_t now = 0; + uint64_t end = 0; + pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); zmq_assert (pollfds); @@ -413,16 +417,21 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) bool first_pass = true; int nevents = 0; - if (timeout_ >= 0) - timeout_ /= 1000; - else - timeout_ = -1; while (true) { + // Compute the timeout for the subsequent poll. + int timeout; + if (first_pass) + timeout = 0; + else if (timeout_ < 0) + timeout = -1; + else + timeout = end - now; + // Wait for events. while (true) { - int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout_); + int rc = poll (pollfds, nitems_, timeout); if (rc == -1 && errno == EINTR) { free (pollfds); return -1; @@ -468,22 +477,33 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) nevents++; } - // If there are no events from the first pass (the one with no - // timeout), do at least the second pass so that we wait. - if (first_pass && nevents == 0 && timeout_ != 0) { - first_pass = false; - continue; - } + // If timout is zero, exit immediately whether there are events or not. + if (timeout_ == 0) + break; - // If timeout is set to infinite and we have to events to return - // we can restart the polling. - if (timeout_ == -1 && nevents == 0) + // If there are events to return, we can exit immediately. + if (nevents) + break; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) continue; - // TODO: if nevents is zero recompute timeout and loop - // if it is not yet reached. + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + if (first_pass) { + now = clock.now_ms (); + end = now + (timeout_ / 1000); + continue; + } - break; + // Find out whether timeout have expired. + now = clock.now_ms (); + if (now >= end) + break; } free (pollfds); @@ -491,6 +511,19 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #elif defined ZMQ_POLL_BASED_ON_SELECT + if (!items_) { + errno = EFAULT; + return -1; + } + + zmq::clock_t clock; + uint64_t now = 0; + uint64_t end = 0; + + // Ensure we do not attempt to select () on more than FD_SETSIZE + // file descriptors. + zmq_assert (nitems_ <= FD_SETSIZE); + fd_set pollset_in; FD_ZERO (&pollset_in); fd_set pollset_out; @@ -500,10 +533,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) zmq::fd_t maxfd = 0; - // Ensure we do not attempt to select () on more than FD_SETSIZE - // file descriptors. - zmq_assert (nitems_ <= FD_SETSIZE); - // Build the fd_sets for passing to select (). for (int i = 0; i != nitems_; i++) { @@ -536,11 +565,25 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) } bool first_pass = true; - timeval zero_timeout = {0, 0}; - timeval timeout = {timeout_ / 1000000, timeout_ % 1000000}; int nevents = 0; fd_set inset, outset, errset; + // Compute the timeout for the subsequent poll. + timeval timeout; + timeval *ptimeout; + if (first_pass) { + timeout.tv_sec = 0; + timeout.tv_usec = 0; + ptimeout = &timeout; + } + else if (timeout_ < 0) + ptimeout = NULL; + else { + timeout.tv_sec = (long) ((end - now) / 1000); + timeout.tv_usec = (long) ((end - now) % 1000 * 1000); + ptimeout = &timeout; + } + while (true) { // Wait for events. Ignore interrupts if there's infinite timeout. @@ -548,8 +591,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) memcpy (&inset, &pollset_in, sizeof (fd_set)); memcpy (&outset, &pollset_out, sizeof (fd_set)); memcpy (&errset, &pollset_err, sizeof (fd_set)); - int rc = select (maxfd + 1, &inset, &outset, &errset, - first_pass ? &zero_timeout : (timeout_ < 0 ? NULL : &timeout)); + int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); #if defined ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else @@ -595,22 +637,33 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) nevents++; } - // If there are no events from the first pass (the one with no - // timout), do at least the second pass so that we wait. - if (first_pass && nevents == 0 && timeout_ != 0) { - first_pass = false; - continue; - } + // If timout is zero, exit immediately whether there are events or not. + if (timeout_ == 0) + break; - // If timeout is set to infinite and we have to events to return - // we can restart the polling. - if (timeout_ < 0 && nevents == 0) + // If there are events to return, we can exit immediately. + if (nevents) + break; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) continue; - // TODO: if nevents is zero recompute timeout and loop - // if it is not yet reached. + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + if (first_pass) { + now = clock.now_ms (); + end = now + (timeout_ / 1000); + continue; + } - break; + // Find out whether timeout have expired. + now = clock.now_ms (); + if (now >= end) + break; } return nevents; |