summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-27 15:01:38 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-27 15:01:38 +0200
commit56faac7f19bf2a6d2c7b6e0c2e35fcb667a72a48 (patch)
tree17e9f7f3009b5fb7af2ab68de149b8938146e881
parent3cb84b5ceac0f8652a99ec61152a865292e02cf1 (diff)
zmq_poll returns prematurely even if infinite timeout is set - fixed
-rw-r--r--src/zmq.cpp204
1 files changed, 114 insertions, 90 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp
index d5e9673..0c1cefc 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -375,6 +375,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno = EFAULT;
return -1;
}
+
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
@@ -405,55 +406,67 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
int nevents = 0;
- // Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
- int rc = poll (pollfds, nitems_, timeout);
- if (rc == -1 && errno == EINTR) {
- if (timeout_ < 0)
- continue;
- else {
- // TODO: Calculate remaining timeout and restart poll ().
- free (pollfds);
- return 0;
+
+ // Wait for events. Ignore interrupts if there's infinite timeout.
+ while (true) {
+ int rc = poll (pollfds, nitems_, timeout);
+ if (rc == -1 && errno == EINTR) {
+ if (timeout_ < 0)
+ continue;
+ else {
+ // TODO: Calculate remaining timeout and restart poll ().
+ free (pollfds);
+ return 0;
+ }
}
+ errno_assert (rc >= 0);
+ break;
}
- errno_assert (rc >= 0);
- break;
- }
- // Check for the events.
- for (int i = 0; i != nitems_; i++) {
+ // Check for the events.
+ for (int i = 0; i != nitems_; i++) {
- items_ [i].revents = 0;
+ items_ [i].revents = 0;
- // The poll item is a 0MQ socket. Retrieve pending events
- // using the ZMQ_EVENTS socket option.
- if (items_ [i].socket && (pollfds [i].revents & POLLIN)) {
- size_t zmq_events_size = sizeof (uint32_t);
- uint32_t zmq_events;
- if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
- &zmq_events_size) == -1) {
- free (pollfds);
- return -1;
+ // The poll item is a 0MQ socket. Retrieve pending events
+ // using the ZMQ_EVENTS socket option.
+ if (items_ [i].socket && (pollfds [i].revents & POLLIN)) {
+ size_t zmq_events_size = sizeof (uint32_t);
+ uint32_t zmq_events;
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
+ &zmq_events_size) == -1) {
+ free (pollfds);
+ return -1;
+ }
+ if ((items_ [i].events & ZMQ_POLLOUT) &&
+ (zmq_events & ZMQ_POLLOUT))
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if ((items_ [i].events & ZMQ_POLLIN) &&
+ (zmq_events & ZMQ_POLLIN))
+ items_ [i].revents |= ZMQ_POLLIN;
}
- if ((items_ [i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT))
- items_ [i].revents |= ZMQ_POLLOUT;
- if ((items_ [i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN))
- items_ [i].revents |= ZMQ_POLLIN;
- }
- // Else, the poll item is a raw file descriptor, simply convert
- // the events to zmq_pollitem_t-style format.
- else {
- if (pollfds [i].revents & POLLIN)
- items_ [i].revents |= ZMQ_POLLIN;
- if (pollfds [i].revents & POLLOUT)
- items_ [i].revents |= ZMQ_POLLOUT;
- if (pollfds [i].revents & ~(POLLIN | POLLOUT))
- items_ [i].revents |= ZMQ_POLLERR;
+ // Else, the poll item is a raw file descriptor, simply convert
+ // the events to zmq_pollitem_t-style format.
+ else {
+ if (pollfds [i].revents & POLLIN)
+ items_ [i].revents |= ZMQ_POLLIN;
+ if (pollfds [i].revents & POLLOUT)
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if (pollfds [i].revents & ~(POLLIN | POLLOUT))
+ items_ [i].revents |= ZMQ_POLLERR;
+ }
+
+ if (items_ [i].revents)
+ nevents++;
}
- if (items_ [i].revents)
- nevents++;
+ // If timeout is set to infinite and we have to events to return
+ // we can restart the polling.
+ if (timeout == -1 && nevents == 0)
+ continue;
+
+ break;
}
free (pollfds);
@@ -505,73 +518,84 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
int nevents = 0;
fd_set inset, outset, errset;
- // Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
- memcpy (&inset, &pollset_in, sizeof (fd_set));
- memcpy (&outset, &pollset_out, sizeof (fd_set));
- memcpy (&errset, &pollset_err, sizeof (fd_set));
- int rc = select (maxfd, &inset, &outset, &errset,
- (timeout_ < 0) ? NULL : &timeout);
+
+ // Wait for events. Ignore interrupts if there's infinite timeout.
+ while (true) {
+ memcpy (&inset, &pollset_in, sizeof (fd_set));
+ memcpy (&outset, &pollset_out, sizeof (fd_set));
+ memcpy (&errset, &pollset_err, sizeof (fd_set));
+ int rc = select (maxfd, &inset, &outset, &errset,
+ (timeout_ < 0) ? NULL : &timeout);
#if defined ZMQ_HAVE_WINDOWS
- wsa_assert (rc != SOCKET_ERROR);
+ wsa_assert (rc != SOCKET_ERROR);
#else
- if (rc == -1 && errno == EINTR) {
- if (timeout_ < 0)
- continue;
- else
- // TODO: Calculate remaining timeout and restart select ().
- return 0;
- }
- errno_assert (rc >= 0);
+ if (rc == -1 && errno == EINTR) {
+ if (timeout_ < 0)
+ continue;
+ else
+ // TODO: Calculate remaining timeout and restart select ().
+ return 0;
+ }
+ errno_assert (rc >= 0);
#endif
- break;
- }
+ break;
+ }
- // Check for the events.
- for (int i = 0; i != nitems_; i++) {
+ // Check for the events.
+ for (int i = 0; i != nitems_; i++) {
- items_ [i].revents = 0;
+ items_ [i].revents = 0;
- // The poll item is a 0MQ socket. Retrieve pending events
- // using the ZMQ_EVENTS socket option.
- if (items_ [i].socket) {
- size_t zmq_fd_size = sizeof (zmq::fd_t);
- zmq::fd_t notify_fd;
- if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
- &zmq_fd_size) == -1)
- return -1;
- if (FD_ISSET (notify_fd, &inset)) {
- size_t zmq_events_size = sizeof (uint32_t);
- uint32_t zmq_events;
- if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
- &zmq_events_size) == -1)
+ // The poll item is a 0MQ socket. Retrieve pending events
+ // using the ZMQ_EVENTS socket option.
+ if (items_ [i].socket) {
+ size_t zmq_fd_size = sizeof (zmq::fd_t);
+ zmq::fd_t notify_fd;
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
+ &zmq_fd_size) == -1)
return -1;
- if ((items_ [i].events & ZMQ_POLLOUT) &&
- (zmq_events & ZMQ_POLLOUT))
- items_ [i].revents |= ZMQ_POLLOUT;
- if ((items_ [i].events & ZMQ_POLLIN) &&
- (zmq_events & ZMQ_POLLIN))
+ if (FD_ISSET (notify_fd, &inset)) {
+ size_t zmq_events_size = sizeof (uint32_t);
+ uint32_t zmq_events;
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
+ &zmq_events_size) == -1)
+ return -1;
+ if ((items_ [i].events & ZMQ_POLLOUT) &&
+ (zmq_events & ZMQ_POLLOUT))
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if ((items_ [i].events & ZMQ_POLLIN) &&
+ (zmq_events & ZMQ_POLLIN))
+ items_ [i].revents |= ZMQ_POLLIN;
+ }
+ }
+ // Else, the poll item is a raw file descriptor, simply convert
+ // the events to zmq_pollitem_t-style format.
+ else {
+ if (FD_ISSET (items_ [i].fd, &inset))
items_ [i].revents |= ZMQ_POLLIN;
+ if (FD_ISSET (items_ [i].fd, &outset))
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if (FD_ISSET (items_ [i].fd, &errset))
+ items_ [i].revents |= ZMQ_POLLERR;
}
+
+ if (items_ [i].revents)
+ nevents++;
}
- // Else, the poll item is a raw file descriptor, simply convert
- // the events to zmq_pollitem_t-style format.
- else {
- if (FD_ISSET (items_ [i].fd, &inset))
- items_ [i].revents |= ZMQ_POLLIN;
- if (FD_ISSET (items_ [i].fd, &outset))
- items_ [i].revents |= ZMQ_POLLOUT;
- if (FD_ISSET (items_ [i].fd, &errset))
- items_ [i].revents |= ZMQ_POLLERR;
- }
- if (items_ [i].revents)
- nevents++;
+ // If timeout is set to infinite and we have to events to return
+ // we can restart the polling.
+ if (timeout_ < 0 && nevents == 0)
+ continue;
+
+ break;
}
return nevents;
#else
+ // Exotic platforms that support neither poll() nor select().
errno = ENOTSUP;
return -1;
#endif