From 035c937ee7452708a9dd3abd851fda6a753808f4 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 28 Aug 2010 07:02:22 +0200 Subject: zmq_poll: account for the fact that ZMQ_FD is edge-triggered --- src/zmq.cpp | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/zmq.cpp b/src/zmq.cpp index 0c1cefc..6cf230c 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -403,6 +403,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) } } + bool first_pass = true; int timeout = timeout_ > 0 ? timeout_ / 1000 : -1; int nevents = 0; @@ -410,7 +411,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // Wait for events. Ignore interrupts if there's infinite timeout. while (true) { - int rc = poll (pollfds, nitems_, timeout); + int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout); if (rc == -1 && errno == EINTR) { if (timeout_ < 0) continue; @@ -431,7 +432,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // The poll item is a 0MQ socket. Retrieve pending events // using the ZMQ_EVENTS socket option. - if (items_ [i].socket && (pollfds [i].revents & POLLIN)) { + if (items_ [i].socket) { size_t zmq_events_size = sizeof (uint32_t); uint32_t zmq_events; if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, @@ -461,6 +462,13 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) nevents++; } + // If there are no events from the first pass (the one with no + // timout), do at least the second pass so that we wait. + if (first_pass && nevents == 0 && timeout_ != 0) { + first_pass = false; + continue; + } + // If timeout is set to infinite and we have to events to return // we can restart the polling. if (timeout == -1 && nevents == 0) @@ -514,6 +522,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) } } + bool first_pass = true; + timeval zero_timeout = {0, 0}; timeval timeout = {timeout_ / 1000000, timeout_ % 1000000}; int nevents = 0; fd_set inset, outset, errset; @@ -526,7 +536,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) memcpy (&outset, &pollset_out, sizeof (fd_set)); memcpy (&errset, &pollset_err, sizeof (fd_set)); int rc = select (maxfd, &inset, &outset, &errset, - (timeout_ < 0) ? NULL : &timeout); + first_pass ? &zero_timeout : (timeout_ < 0 ? NULL : &timeout)); #if defined ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else @@ -553,19 +563,19 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) size_t zmq_fd_size = sizeof (zmq::fd_t); zmq::fd_t notify_fd; if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, - &zmq_fd_size) == -1) + &zmq_fd_size) == -1) return -1; if (FD_ISSET (notify_fd, &inset)) { size_t zmq_events_size = sizeof (uint32_t); uint32_t zmq_events; if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, - &zmq_events_size) == -1) + &zmq_events_size) == -1) return -1; if ((items_ [i].events & ZMQ_POLLOUT) && - (zmq_events & ZMQ_POLLOUT)) + (zmq_events & ZMQ_POLLOUT)) items_ [i].revents |= ZMQ_POLLOUT; if ((items_ [i].events & ZMQ_POLLIN) && - (zmq_events & ZMQ_POLLIN)) + (zmq_events & ZMQ_POLLIN)) items_ [i].revents |= ZMQ_POLLIN; } } @@ -584,6 +594,13 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) nevents++; } + // If there are no events from the first pass (the one with no + // timout), do at least the second pass so that we wait. + if (first_pass && nevents == 0 && timeout_ != 0) { + first_pass = false; + continue; + } + // If timeout is set to infinite and we have to events to return // we can restart the polling. if (timeout_ < 0 && nevents == 0) -- cgit v1.2.3