diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-08-27 15:01:38 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-08-27 15:01:38 +0200 |
commit | 56faac7f19bf2a6d2c7b6e0c2e35fcb667a72a48 (patch) | |
tree | 17e9f7f3009b5fb7af2ab68de149b8938146e881 | |
parent | 3cb84b5ceac0f8652a99ec61152a865292e02cf1 (diff) |
zmq_poll returns prematurely even if infinite timeout is set - fixed
-rw-r--r-- | src/zmq.cpp | 204 |
1 files changed, 114 insertions, 90 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp index d5e9673..0c1cefc 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -375,6 +375,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) errno = EFAULT; return -1; } + pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); zmq_assert (pollfds); @@ -405,55 +406,67 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) int timeout = timeout_ > 0 ? timeout_ / 1000 : -1; int nevents = 0; - // Wait for events. Ignore interrupts if there's infinite timeout. while (true) { - int rc = poll (pollfds, nitems_, timeout); - if (rc == -1 && errno == EINTR) { - if (timeout_ < 0) - continue; - else { - // TODO: Calculate remaining timeout and restart poll (). - free (pollfds); - return 0; + + // Wait for events. Ignore interrupts if there's infinite timeout. + while (true) { + int rc = poll (pollfds, nitems_, timeout); + if (rc == -1 && errno == EINTR) { + if (timeout_ < 0) + continue; + else { + // TODO: Calculate remaining timeout and restart poll (). + free (pollfds); + return 0; + } } + errno_assert (rc >= 0); + break; } - errno_assert (rc >= 0); - break; - } - // Check for the events. - for (int i = 0; i != nitems_; i++) { + // Check for the events. + for (int i = 0; i != nitems_; i++) { - items_ [i].revents = 0; + items_ [i].revents = 0; - // The poll item is a 0MQ socket. Retrieve pending events - // using the ZMQ_EVENTS socket option. - if (items_ [i].socket && (pollfds [i].revents & POLLIN)) { - size_t zmq_events_size = sizeof (uint32_t); - uint32_t zmq_events; - if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, - &zmq_events_size) == -1) { - free (pollfds); - return -1; + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_ [i].socket && (pollfds [i].revents & POLLIN)) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) == -1) { + free (pollfds); + return -1; + } + if ((items_ [i].events & ZMQ_POLLOUT) && + (zmq_events & ZMQ_POLLOUT)) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && + (zmq_events & ZMQ_POLLIN)) + items_ [i].revents |= ZMQ_POLLIN; } - if ((items_ [i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT)) - items_ [i].revents |= ZMQ_POLLOUT; - if ((items_ [i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN)) - items_ [i].revents |= ZMQ_POLLIN; - } - // Else, the poll item is a raw file descriptor, simply convert - // the events to zmq_pollitem_t-style format. - else { - if (pollfds [i].revents & POLLIN) - items_ [i].revents |= ZMQ_POLLIN; - if (pollfds [i].revents & POLLOUT) - items_ [i].revents |= ZMQ_POLLOUT; - if (pollfds [i].revents & ~(POLLIN | POLLOUT)) - items_ [i].revents |= ZMQ_POLLERR; + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + if (pollfds [i].revents & POLLIN) + items_ [i].revents |= ZMQ_POLLIN; + if (pollfds [i].revents & POLLOUT) + items_ [i].revents |= ZMQ_POLLOUT; + if (pollfds [i].revents & ~(POLLIN | POLLOUT)) + items_ [i].revents |= ZMQ_POLLERR; + } + + if (items_ [i].revents) + nevents++; } - if (items_ [i].revents) - nevents++; + // If timeout is set to infinite and we have to events to return + // we can restart the polling. + if (timeout == -1 && nevents == 0) + continue; + + break; } free (pollfds); @@ -505,73 +518,84 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) int nevents = 0; fd_set inset, outset, errset; - // Wait for events. Ignore interrupts if there's infinite timeout. while (true) { - memcpy (&inset, &pollset_in, sizeof (fd_set)); - memcpy (&outset, &pollset_out, sizeof (fd_set)); - memcpy (&errset, &pollset_err, sizeof (fd_set)); - int rc = select (maxfd, &inset, &outset, &errset, - (timeout_ < 0) ? NULL : &timeout); + + // Wait for events. Ignore interrupts if there's infinite timeout. + while (true) { + memcpy (&inset, &pollset_in, sizeof (fd_set)); + memcpy (&outset, &pollset_out, sizeof (fd_set)); + memcpy (&errset, &pollset_err, sizeof (fd_set)); + int rc = select (maxfd, &inset, &outset, &errset, + (timeout_ < 0) ? NULL : &timeout); #if defined ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); + wsa_assert (rc != SOCKET_ERROR); #else - if (rc == -1 && errno == EINTR) { - if (timeout_ < 0) - continue; - else - // TODO: Calculate remaining timeout and restart select (). - return 0; - } - errno_assert (rc >= 0); + if (rc == -1 && errno == EINTR) { + if (timeout_ < 0) + continue; + else + // TODO: Calculate remaining timeout and restart select (). + return 0; + } + errno_assert (rc >= 0); #endif - break; - } + break; + } - // Check for the events. - for (int i = 0; i != nitems_; i++) { + // Check for the events. + for (int i = 0; i != nitems_; i++) { - items_ [i].revents = 0; + items_ [i].revents = 0; - // The poll item is a 0MQ socket. Retrieve pending events - // using the ZMQ_EVENTS socket option. - if (items_ [i].socket) { - size_t zmq_fd_size = sizeof (zmq::fd_t); - zmq::fd_t notify_fd; - if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, - &zmq_fd_size) == -1) - return -1; - if (FD_ISSET (notify_fd, &inset)) { - size_t zmq_events_size = sizeof (uint32_t); - uint32_t zmq_events; - if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, - &zmq_events_size) == -1) + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_ [i].socket) { + size_t zmq_fd_size = sizeof (zmq::fd_t); + zmq::fd_t notify_fd; + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) == -1) return -1; - if ((items_ [i].events & ZMQ_POLLOUT) && - (zmq_events & ZMQ_POLLOUT)) - items_ [i].revents |= ZMQ_POLLOUT; - if ((items_ [i].events & ZMQ_POLLIN) && - (zmq_events & ZMQ_POLLIN)) + if (FD_ISSET (notify_fd, &inset)) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) == -1) + return -1; + if ((items_ [i].events & ZMQ_POLLOUT) && + (zmq_events & ZMQ_POLLOUT)) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && + (zmq_events & ZMQ_POLLIN)) + items_ [i].revents |= ZMQ_POLLIN; + } + } + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + if (FD_ISSET (items_ [i].fd, &inset)) items_ [i].revents |= ZMQ_POLLIN; + if (FD_ISSET (items_ [i].fd, &outset)) + items_ [i].revents |= ZMQ_POLLOUT; + if (FD_ISSET (items_ [i].fd, &errset)) + items_ [i].revents |= ZMQ_POLLERR; } + + if (items_ [i].revents) + nevents++; } - // Else, the poll item is a raw file descriptor, simply convert - // the events to zmq_pollitem_t-style format. - else { - if (FD_ISSET (items_ [i].fd, &inset)) - items_ [i].revents |= ZMQ_POLLIN; - if (FD_ISSET (items_ [i].fd, &outset)) - items_ [i].revents |= ZMQ_POLLOUT; - if (FD_ISSET (items_ [i].fd, &errset)) - items_ [i].revents |= ZMQ_POLLERR; - } - if (items_ [i].revents) - nevents++; + // If timeout is set to infinite and we have to events to return + // we can restart the polling. + if (timeout_ < 0 && nevents == 0) + continue; + + break; } return nevents; #else + // Exotic platforms that support neither poll() nor select(). errno = ENOTSUP; return -1; #endif |