summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-28 07:02:22 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-28 07:02:22 +0200
commit035c937ee7452708a9dd3abd851fda6a753808f4 (patch)
tree429e303e6be9fafb82e7bd2db058e37824b489ca
parent67aa788577fb49474dd7329b14316d25f1b3c08b (diff)
zmq_poll: account for the fact that ZMQ_FD is edge-triggered
-rw-r--r--src/zmq.cpp31
1 files changed, 24 insertions, 7 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 0c1cefc..6cf230c 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -403,6 +403,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
}
}
+ bool first_pass = true;
int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
int nevents = 0;
@@ -410,7 +411,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
- int rc = poll (pollfds, nitems_, timeout);
+ int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout);
if (rc == -1 && errno == EINTR) {
if (timeout_ < 0)
continue;
@@ -431,7 +432,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
- if (items_ [i].socket && (pollfds [i].revents & POLLIN)) {
+ if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
@@ -461,6 +462,13 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
nevents++;
}
+ // If there are no events from the first pass (the one with no
+ // timout), do at least the second pass so that we wait.
+ if (first_pass && nevents == 0 && timeout_ != 0) {
+ first_pass = false;
+ continue;
+ }
+
// If timeout is set to infinite and we have to events to return
// we can restart the polling.
if (timeout == -1 && nevents == 0)
@@ -514,6 +522,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
}
}
+ bool first_pass = true;
+ timeval zero_timeout = {0, 0};
timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
int nevents = 0;
fd_set inset, outset, errset;
@@ -526,7 +536,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
int rc = select (maxfd, &inset, &outset, &errset,
- (timeout_ < 0) ? NULL : &timeout);
+ first_pass ? &zero_timeout : (timeout_ < 0 ? NULL : &timeout));
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
@@ -553,19 +563,19 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
size_t zmq_fd_size = sizeof (zmq::fd_t);
zmq::fd_t notify_fd;
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
- &zmq_fd_size) == -1)
+ &zmq_fd_size) == -1)
return -1;
if (FD_ISSET (notify_fd, &inset)) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
- &zmq_events_size) == -1)
+ &zmq_events_size) == -1)
return -1;
if ((items_ [i].events & ZMQ_POLLOUT) &&
- (zmq_events & ZMQ_POLLOUT))
+ (zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
- (zmq_events & ZMQ_POLLIN))
+ (zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
}
}
@@ -584,6 +594,13 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
nevents++;
}
+ // If there are no events from the first pass (the one with no
+ // timout), do at least the second pass so that we wait.
+ if (first_pass && nevents == 0 && timeout_ != 0) {
+ first_pass = false;
+ continue;
+ }
+
// If timeout is set to infinite and we have to events to return
// we can restart the polling.
if (timeout_ < 0 && nevents == 0)