diff options
| author | Martin Sustrik <sustrik@250bpm.com> | 2010-08-27 15:01:38 +0200 | 
|---|---|---|
| committer | Martin Sustrik <sustrik@250bpm.com> | 2010-08-27 15:01:38 +0200 | 
| commit | 56faac7f19bf2a6d2c7b6e0c2e35fcb667a72a48 (patch) | |
| tree | 17e9f7f3009b5fb7af2ab68de149b8938146e881 | |
| parent | 3cb84b5ceac0f8652a99ec61152a865292e02cf1 (diff) | |
zmq_poll returns prematurely even if infinite timeout is set - fixed
| -rw-r--r-- | src/zmq.cpp | 204 | 
1 files changed, 114 insertions, 90 deletions
| diff --git a/src/zmq.cpp b/src/zmq.cpp index d5e9673..0c1cefc 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -375,6 +375,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)          errno = EFAULT;          return -1;      } +      pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));      zmq_assert (pollfds); @@ -405,55 +406,67 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)      int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;      int nevents = 0; -    //  Wait for events. Ignore interrupts if there's infinite timeout.      while (true) { -        int rc = poll (pollfds, nitems_, timeout); -        if (rc == -1 && errno == EINTR) { -            if (timeout_ < 0) -                continue; -            else { -                //  TODO: Calculate remaining timeout and restart poll (). -                free (pollfds); -                return 0; + +        //  Wait for events. Ignore interrupts if there's infinite timeout. +        while (true) { +            int rc = poll (pollfds, nitems_, timeout); +            if (rc == -1 && errno == EINTR) { +                if (timeout_ < 0) +                    continue; +                else { +                    //  TODO: Calculate remaining timeout and restart poll (). +                    free (pollfds); +                    return 0; +                }              } +            errno_assert (rc >= 0); +            break;          } -        errno_assert (rc >= 0); -        break; -    } -    //  Check for the events. -    for (int i = 0; i != nitems_; i++) { +        //  Check for the events. +        for (int i = 0; i != nitems_; i++) { -        items_ [i].revents = 0; +            items_ [i].revents = 0; -        //  The poll item is a 0MQ socket. Retrieve pending events -        //  using the ZMQ_EVENTS socket option. -        if (items_ [i].socket && (pollfds [i].revents & POLLIN)) { -            size_t zmq_events_size = sizeof (uint32_t); -            uint32_t zmq_events; -            if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, -                &zmq_events_size) == -1) { -                free (pollfds); -                return -1; +            //  The poll item is a 0MQ socket. Retrieve pending events +            //  using the ZMQ_EVENTS socket option. +            if (items_ [i].socket && (pollfds [i].revents & POLLIN)) { +                size_t zmq_events_size = sizeof (uint32_t); +                uint32_t zmq_events; +                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, +                    &zmq_events_size) == -1) { +                    free (pollfds); +                    return -1; +                } +                if ((items_ [i].events & ZMQ_POLLOUT) && +                      (zmq_events & ZMQ_POLLOUT)) +                    items_ [i].revents |= ZMQ_POLLOUT; +                if ((items_ [i].events & ZMQ_POLLIN) && +                      (zmq_events & ZMQ_POLLIN)) +                    items_ [i].revents |= ZMQ_POLLIN;              } -            if ((items_ [i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT)) -                items_ [i].revents |= ZMQ_POLLOUT; -            if ((items_ [i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN)) -                items_ [i].revents |= ZMQ_POLLIN; -        } -        //  Else, the poll item is a raw file descriptor, simply convert -        //  the events to zmq_pollitem_t-style format. -        else { -            if (pollfds [i].revents & POLLIN) -                items_ [i].revents |= ZMQ_POLLIN; -            if (pollfds [i].revents & POLLOUT) -                items_ [i].revents |= ZMQ_POLLOUT; -            if (pollfds [i].revents & ~(POLLIN | POLLOUT)) -                items_ [i].revents |= ZMQ_POLLERR; +            //  Else, the poll item is a raw file descriptor, simply convert +            //  the events to zmq_pollitem_t-style format. +            else { +                if (pollfds [i].revents & POLLIN) +                    items_ [i].revents |= ZMQ_POLLIN; +                if (pollfds [i].revents & POLLOUT) +                    items_ [i].revents |= ZMQ_POLLOUT; +                if (pollfds [i].revents & ~(POLLIN | POLLOUT)) +                    items_ [i].revents |= ZMQ_POLLERR; +            } + +            if (items_ [i].revents) +                nevents++;          } -        if (items_ [i].revents) -            nevents++; +        //  If timeout is set to infinite and we have to events to return +        //  we can restart the polling. +        if (timeout == -1 && nevents == 0) +            continue; + +        break;      }      free (pollfds); @@ -505,73 +518,84 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)      int nevents = 0;      fd_set inset, outset, errset; -    //  Wait for events. Ignore interrupts if there's infinite timeout.      while (true) { -        memcpy (&inset, &pollset_in, sizeof (fd_set)); -        memcpy (&outset, &pollset_out, sizeof (fd_set)); -        memcpy (&errset, &pollset_err, sizeof (fd_set)); -        int rc = select (maxfd, &inset, &outset, &errset, -            (timeout_ < 0) ? NULL : &timeout); + +        //  Wait for events. Ignore interrupts if there's infinite timeout. +        while (true) { +            memcpy (&inset, &pollset_in, sizeof (fd_set)); +            memcpy (&outset, &pollset_out, sizeof (fd_set)); +            memcpy (&errset, &pollset_err, sizeof (fd_set)); +            int rc = select (maxfd, &inset, &outset, &errset, +                (timeout_ < 0) ? NULL : &timeout);  #if defined ZMQ_HAVE_WINDOWS -        wsa_assert (rc != SOCKET_ERROR); +            wsa_assert (rc != SOCKET_ERROR);  #else -        if (rc == -1 && errno == EINTR) { -            if (timeout_ < 0) -                continue; -            else -                //  TODO: Calculate remaining timeout and restart select (). -                return 0; -        } -        errno_assert (rc >= 0); +            if (rc == -1 && errno == EINTR) { +                if (timeout_ < 0) +                    continue; +                else +                    //  TODO: Calculate remaining timeout and restart select (). +                    return 0; +            } +            errno_assert (rc >= 0);  #endif -        break; -    } +            break; +        } -    //  Check for the events. -    for (int i = 0; i != nitems_; i++) { +        //  Check for the events. +        for (int i = 0; i != nitems_; i++) { -        items_ [i].revents = 0; +            items_ [i].revents = 0; -        //  The poll item is a 0MQ socket. Retrieve pending events -        //  using the ZMQ_EVENTS socket option. -        if (items_ [i].socket) { -            size_t zmq_fd_size = sizeof (zmq::fd_t); -            zmq::fd_t notify_fd; -            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, -                &zmq_fd_size) == -1) -                return -1; -            if (FD_ISSET (notify_fd, &inset)) { -                size_t zmq_events_size = sizeof (uint32_t); -                uint32_t zmq_events; -                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, -                    &zmq_events_size) == -1) +            //  The poll item is a 0MQ socket. Retrieve pending events +            //  using the ZMQ_EVENTS socket option. +            if (items_ [i].socket) { +                size_t zmq_fd_size = sizeof (zmq::fd_t); +                zmq::fd_t notify_fd; +                if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, +                    &zmq_fd_size) == -1)                      return -1; -                if ((items_ [i].events & ZMQ_POLLOUT) && -                    (zmq_events & ZMQ_POLLOUT)) -                    items_ [i].revents |= ZMQ_POLLOUT; -                if ((items_ [i].events & ZMQ_POLLIN) && -                    (zmq_events & ZMQ_POLLIN)) +                if (FD_ISSET (notify_fd, &inset)) { +                    size_t zmq_events_size = sizeof (uint32_t); +                    uint32_t zmq_events; +                    if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, +                        &zmq_events_size) == -1) +                        return -1; +                    if ((items_ [i].events & ZMQ_POLLOUT) && +                        (zmq_events & ZMQ_POLLOUT)) +                        items_ [i].revents |= ZMQ_POLLOUT; +                    if ((items_ [i].events & ZMQ_POLLIN) && +                        (zmq_events & ZMQ_POLLIN)) +                        items_ [i].revents |= ZMQ_POLLIN; +                } +            } +            //  Else, the poll item is a raw file descriptor, simply convert +            //  the events to zmq_pollitem_t-style format. +            else { +                if (FD_ISSET (items_ [i].fd, &inset))                      items_ [i].revents |= ZMQ_POLLIN; +                if (FD_ISSET (items_ [i].fd, &outset)) +                    items_ [i].revents |= ZMQ_POLLOUT; +                if (FD_ISSET (items_ [i].fd, &errset)) +                    items_ [i].revents |= ZMQ_POLLERR;              } + +            if (items_ [i].revents) +                nevents++;          } -        //  Else, the poll item is a raw file descriptor, simply convert -        //  the events to zmq_pollitem_t-style format. -        else { -            if (FD_ISSET (items_ [i].fd, &inset)) -                items_ [i].revents |= ZMQ_POLLIN; -            if (FD_ISSET (items_ [i].fd, &outset)) -                items_ [i].revents |= ZMQ_POLLOUT; -            if (FD_ISSET (items_ [i].fd, &errset)) -                items_ [i].revents |= ZMQ_POLLERR; -        } -        if (items_ [i].revents) -            nevents++; +        //  If timeout is set to infinite and we have to events to return +        //  we can restart the polling. +        if (timeout_ < 0 && nevents == 0) +            continue; + +        break;      }      return nevents;  #else +    //  Exotic platforms that support neither poll() nor select().      errno = ENOTSUP;      return -1;  #endif | 
