diff options
| -rw-r--r-- | src/zmq.cpp | 227 | 
1 files changed, 90 insertions, 137 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp index ceac158..6ec8fbe 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -382,8 +382,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)          //  If the poll item is a 0MQ socket, we poll on the file descriptor          //  retrieved by the ZMQ_FD socket option. -        if (items_ [i].socket) { -            size_t zmq_fd_size = sizeof (int); +        if (items_ [i].socket && items_ [i].events) { +            size_t zmq_fd_size = sizeof (zmq::fd_t);              if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,                  &zmq_fd_size) == -1) {                  free (pollfds); @@ -410,9 +410,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)          if (rc == -1 && errno == EINTR) {              if (timeout_ < 0)                  continue; -            //  Interrupted, no way to determine how much time is remaining -            //  from timeout so just return for now.              else { +                //  TODO: Calculate remaining timeout and restart poll ().                  free (pollfds);                  return 0;              } @@ -428,15 +427,18 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)          //  The poll item is a 0MQ socket. Retrieve pending events          //  using the ZMQ_EVENTS socket option. -        if (items_ [i].socket) { -            if (pollfds [i].revents & POLLIN) { -                size_t zmq_events_size = sizeof (uint32_t); -                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, -                    &items_ [i].revents, &zmq_events_size) == -1) { -                    free (pollfds); -                    return -1; -                } +        if (items_ [i].socket && (pollfds [i].revents & POLLIN)) { +            size_t zmq_events_size = sizeof (uint32_t); +            uint32_t zmq_events; +            if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, +                &zmq_events_size) == -1) { +                free (pollfds); +                return -1;              } +            if ((items_ [i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT)) +                items_ [i].revents |= ZMQ_POLLOUT; +            if ((items_ [i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN)) +                items_ [i].revents |= ZMQ_POLLIN;          }          //  Else, the poll item is a raw file descriptor, simply convert          //  the events to zmq_pollitem_t-style format. @@ -465,152 +467,103 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)      fd_set pollset_err;      FD_ZERO (&pollset_err); -    zmq::app_thread_t *app_thread = NULL; -    int nsockets = 0; -    zmq::fd_t maxfd = zmq::retired_fd; -    zmq::fd_t notify_fd = zmq::retired_fd; +    zmq::fd_t maxfd = 0; +    //  Build the fd_sets for passing to select ().      for (int i = 0; i != nitems_; i++) { -        //  0MQ sockets. -        if (items_ [i].socket) { - -            //  Get the app_thread the socket is living in. If there are two -            //  sockets in the same pollset with different app threads, fail. -            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; -            if (app_thread) { -                if (app_thread != s->get_thread ()) { -                    errno = EFAULT; -                    return -1; -                } -            } -            else -                app_thread = s->get_thread (); - -            nsockets++; -            continue; +        //  If the poll item is a 0MQ socket we are interested in input on the +        //  notification file descriptor retrieved by the ZMQ_FD socket option. +        if (items_ [i].socket && items_ [i].events) { +            size_t zmq_fd_size = sizeof (zmq::fd_t); +            zmq::fd_t notify_fd; +            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, +                &zmq_fd_size) == -1) +                return -1; +            FD_SET (notify_fd, &pollset_in); +            if (maxfd < notify_fd) +                maxfd = notify_fd;          } - -        //  Raw file descriptors. -        if (items_ [i].events & ZMQ_POLLIN) -            FD_SET (items_ [i].fd, &pollset_in); -        if (items_ [i].events & ZMQ_POLLOUT) -            FD_SET (items_ [i].fd, &pollset_out); -        if (items_ [i].events & ZMQ_POLLERR) -            FD_SET (items_ [i].fd, &pollset_err); -        if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd) -            maxfd = items_ [i].fd; -    } - -    //  If there's at least one 0MQ socket in the pollset we have to poll -    //  for 0MQ commands. If ZMQ_POLL was not set, fail. -    if (nsockets) { -        notify_fd = app_thread->get_signaler ()->get_fd (); -        if (notify_fd == zmq::retired_fd) { -            errno = ENOTSUP; -            return -1; +        //  Else, the poll item is a raw file descriptor. Convert the poll item +        //  events to the appropriate fd_sets. +        else { +            if (items_ [i].events & ZMQ_POLLIN) +                FD_SET (items_ [i].fd, &pollset_in); +            if (items_ [i].events & ZMQ_POLLOUT) +                FD_SET (items_ [i].fd, &pollset_out); +            if (items_ [i].events & ZMQ_POLLERR) +                FD_SET (items_ [i].fd, &pollset_err); +            if (maxfd < items_ [i].fd) +                maxfd = items_ [i].fd;          } -        FD_SET (notify_fd, &pollset_in); -        if (maxfd == zmq::retired_fd || maxfd < notify_fd) -            maxfd = notify_fd;      } -    bool block = (timeout_ < 0);      timeval timeout = {timeout_ / 1000000, timeout_ % 1000000}; -    timeval zero_timeout = {0, 0};      int nevents = 0; - -    //  First iteration just check for events, don't block. Waiting would -    //  prevent exiting on any events that may already been signaled on -    //  0MQ sockets.      fd_set inset, outset, errset; -    memcpy (&inset, &pollset_in, sizeof (fd_set)); -    memcpy (&outset, &pollset_out, sizeof (fd_set)); -    memcpy (&errset, &pollset_err, sizeof (fd_set)); -    int rc = select (maxfd, &inset, &outset, &errset, &zero_timeout); + +    //  Wait for events. Ignore interrupts if there's infinite timeout. +    while (true) { +        memcpy (&inset, &pollset_in, sizeof (fd_set)); +        memcpy (&outset, &pollset_out, sizeof (fd_set)); +        memcpy (&errset, &pollset_err, sizeof (fd_set)); +        int rc = select (maxfd, &inset, &outset, &errset, +            (timeout_ < 0) ? NULL : &timeout);  #if defined ZMQ_HAVE_WINDOWS -    wsa_assert (rc != SOCKET_ERROR); +        wsa_assert (rc != SOCKET_ERROR);  #else -    if (rc == -1 && errno == EINTR && timeout_ >= 0) -        return 0; -    errno_assert (rc >= 0 || (rc == -1 && errno == EINTR)); +        if (rc == -1 && errno == EINTR) { +            if (timeout_ < 0) +                continue; +            else +                //  TODO: Calculate remaining timeout and restart select (). +                return 0; +        } +        errno_assert (rc >= 0);  #endif +        break; +    } -    while (true) { - -        //  Process 0MQ commands if needed. -        if (nsockets && FD_ISSET (notify_fd, &inset)) -            if (!app_thread->process_commands (false, false)) { -                errno = ETERM; -                return -1; -            } +    //  Check for the events. +    for (int i = 0; i != nitems_; i++) { -        //  Check for the events. -        for (int i = 0; i != nitems_; i++) { +        items_ [i].revents = 0; -            //  If the poll item is a raw file descriptor, simply convert -            //  the events to zmq_pollitem_t-style format. -            if (!items_ [i].socket) { -                items_ [i].revents = 0; -                if (FD_ISSET (items_ [i].fd, &inset)) -                    items_ [i].revents |= ZMQ_POLLIN; -                if (FD_ISSET (items_ [i].fd, &outset)) +        //  The poll item is a 0MQ socket. Retrieve pending events +        //  using the ZMQ_EVENTS socket option. +        if (items_ [i].socket) { +            size_t zmq_fd_size = sizeof (zmq::fd_t); +            zmq::fd_t notify_fd; +            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, +                &zmq_fd_size) == -1) +                return -1; +            if (FD_ISSET (notify_fd, &inset)) { +                size_t zmq_events_size = sizeof (uint32_t); +                uint32_t zmq_events; +                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, +                    &zmq_events_size) == -1) +                    return -1; +                if ((items_ [i].events & ZMQ_POLLOUT) && +                    (zmq_events & ZMQ_POLLOUT))                      items_ [i].revents |= ZMQ_POLLOUT; -                if (FD_ISSET (items_ [i].fd, &errset)) -                    items_ [i].revents |= ZMQ_POLLERR; -                if (items_ [i].revents) -                    nevents++; -                continue; +                if ((items_ [i].events & ZMQ_POLLIN) && +                    (zmq_events & ZMQ_POLLIN)) +                    items_ [i].revents |= ZMQ_POLLIN;              } - -            //  The poll item is a 0MQ socket. -            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; -            items_ [i].revents = 0; -            if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) -                items_ [i].revents |= ZMQ_POLLOUT; -            if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) +        } +        //  Else, the poll item is a raw file descriptor, simply convert +        //  the events to zmq_pollitem_t-style format. +        else { +            if (FD_ISSET (items_ [i].fd, &inset))                  items_ [i].revents |= ZMQ_POLLIN; -            if (items_ [i].revents) -                nevents++; +            if (FD_ISSET (items_ [i].fd, &outset)) +                items_ [i].revents |= ZMQ_POLLOUT; +            if (FD_ISSET (items_ [i].fd, &errset)) +                items_ [i].revents |= ZMQ_POLLERR;          } -        //  If there's at least one event, or if we are asked not to block, -        //  return immediately. -        if (nevents || (timeout.tv_sec == 0 && timeout.tv_usec == 0)) -            break; - -        //  Wait for events. Ignore interrupts if there's infinite timeout. -        while (true) { -            memcpy (&inset, &pollset_in, sizeof (fd_set)); -            memcpy (&outset, &pollset_out, sizeof (fd_set)); -            memcpy (&errset, &pollset_err, sizeof (fd_set)); -            rc = select (maxfd, &inset, &outset, &errset, -                block ? NULL : &timeout); -#if defined ZMQ_HAVE_WINDOWS -            wsa_assert (rc != SOCKET_ERROR); -#else -            if (rc == -1 && errno == EINTR) { -                if (timeout_ < 0) -                    continue; -                else { -                    rc = 0; -                    break; -                } -            } -            errno_assert (rc >= 0); -#endif -            break; -        } -         -        //  If timeout was hit with no events signaled, return zero. -        if (rc == 0) -            break; - -        //  If timeout was already applied, we don't want to poll anymore. -        //  Setting timeout to zero will cause termination of the function -        //  once the events we've got are processed. -        if (!block) -            timeout = zero_timeout; +        if (items_ [i].revents) +            nevents++;      }      return nevents;  | 
