From a85d1e51bff991a0d2f93ded2724e0ee290edf12 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Sat, 7 Aug 2010 20:35:42 +0200 Subject: zmq_poll(): Rewrite to use ZMQ_FD/ZMQ_EVENTS pt2 Rewrite the select()-based zmq_poll() implementation to use ZMQ_FD and ZMQ_EVENTS. Also fix some corner cases: We should not pollute revents with unrequested events, and we don't need to poll on ZMQ_FD at all if a pollitem with no events set was passed in. --- src/zmq.cpp | 227 ++++++++++++++++++++++++------------------------------------ 1 file changed, 90 insertions(+), 137 deletions(-) diff --git a/src/zmq.cpp b/src/zmq.cpp index ceac158..6ec8fbe 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -382,8 +382,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // If the poll item is a 0MQ socket, we poll on the file descriptor // retrieved by the ZMQ_FD socket option. - if (items_ [i].socket) { - size_t zmq_fd_size = sizeof (int); + if (items_ [i].socket && items_ [i].events) { + size_t zmq_fd_size = sizeof (zmq::fd_t); if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd, &zmq_fd_size) == -1) { free (pollfds); @@ -410,9 +410,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) if (rc == -1 && errno == EINTR) { if (timeout_ < 0) continue; - // Interrupted, no way to determine how much time is remaining - // from timeout so just return for now. else { + // TODO: Calculate remaining timeout and restart poll (). free (pollfds); return 0; } @@ -428,15 +427,18 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // The poll item is a 0MQ socket. Retrieve pending events // using the ZMQ_EVENTS socket option. - if (items_ [i].socket) { - if (pollfds [i].revents & POLLIN) { - size_t zmq_events_size = sizeof (uint32_t); - if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, - &items_ [i].revents, &zmq_events_size) == -1) { - free (pollfds); - return -1; - } + if (items_ [i].socket && (pollfds [i].revents & POLLIN)) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) == -1) { + free (pollfds); + return -1; } + if ((items_ [i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT)) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN)) + items_ [i].revents |= ZMQ_POLLIN; } // Else, the poll item is a raw file descriptor, simply convert // the events to zmq_pollitem_t-style format. @@ -465,152 +467,103 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) fd_set pollset_err; FD_ZERO (&pollset_err); - zmq::app_thread_t *app_thread = NULL; - int nsockets = 0; - zmq::fd_t maxfd = zmq::retired_fd; - zmq::fd_t notify_fd = zmq::retired_fd; + zmq::fd_t maxfd = 0; + // Build the fd_sets for passing to select (). for (int i = 0; i != nitems_; i++) { - // 0MQ sockets. - if (items_ [i].socket) { - - // Get the app_thread the socket is living in. If there are two - // sockets in the same pollset with different app threads, fail. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - if (app_thread) { - if (app_thread != s->get_thread ()) { - errno = EFAULT; - return -1; - } - } - else - app_thread = s->get_thread (); - - nsockets++; - continue; + // If the poll item is a 0MQ socket we are interested in input on the + // notification file descriptor retrieved by the ZMQ_FD socket option. + if (items_ [i].socket && items_ [i].events) { + size_t zmq_fd_size = sizeof (zmq::fd_t); + zmq::fd_t notify_fd; + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) == -1) + return -1; + FD_SET (notify_fd, &pollset_in); + if (maxfd < notify_fd) + maxfd = notify_fd; } - - // Raw file descriptors. - if (items_ [i].events & ZMQ_POLLIN) - FD_SET (items_ [i].fd, &pollset_in); - if (items_ [i].events & ZMQ_POLLOUT) - FD_SET (items_ [i].fd, &pollset_out); - if (items_ [i].events & ZMQ_POLLERR) - FD_SET (items_ [i].fd, &pollset_err); - if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd) - maxfd = items_ [i].fd; - } - - // If there's at least one 0MQ socket in the pollset we have to poll - // for 0MQ commands. If ZMQ_POLL was not set, fail. - if (nsockets) { - notify_fd = app_thread->get_signaler ()->get_fd (); - if (notify_fd == zmq::retired_fd) { - errno = ENOTSUP; - return -1; + // Else, the poll item is a raw file descriptor. Convert the poll item + // events to the appropriate fd_sets. + else { + if (items_ [i].events & ZMQ_POLLIN) + FD_SET (items_ [i].fd, &pollset_in); + if (items_ [i].events & ZMQ_POLLOUT) + FD_SET (items_ [i].fd, &pollset_out); + if (items_ [i].events & ZMQ_POLLERR) + FD_SET (items_ [i].fd, &pollset_err); + if (maxfd < items_ [i].fd) + maxfd = items_ [i].fd; } - FD_SET (notify_fd, &pollset_in); - if (maxfd == zmq::retired_fd || maxfd < notify_fd) - maxfd = notify_fd; } - bool block = (timeout_ < 0); timeval timeout = {timeout_ / 1000000, timeout_ % 1000000}; - timeval zero_timeout = {0, 0}; int nevents = 0; - - // First iteration just check for events, don't block. Waiting would - // prevent exiting on any events that may already been signaled on - // 0MQ sockets. fd_set inset, outset, errset; - memcpy (&inset, &pollset_in, sizeof (fd_set)); - memcpy (&outset, &pollset_out, sizeof (fd_set)); - memcpy (&errset, &pollset_err, sizeof (fd_set)); - int rc = select (maxfd, &inset, &outset, &errset, &zero_timeout); + + // Wait for events. Ignore interrupts if there's infinite timeout. + while (true) { + memcpy (&inset, &pollset_in, sizeof (fd_set)); + memcpy (&outset, &pollset_out, sizeof (fd_set)); + memcpy (&errset, &pollset_err, sizeof (fd_set)); + int rc = select (maxfd, &inset, &outset, &errset, + (timeout_ < 0) ? NULL : &timeout); #if defined ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); + wsa_assert (rc != SOCKET_ERROR); #else - if (rc == -1 && errno == EINTR && timeout_ >= 0) - return 0; - errno_assert (rc >= 0 || (rc == -1 && errno == EINTR)); + if (rc == -1 && errno == EINTR) { + if (timeout_ < 0) + continue; + else + // TODO: Calculate remaining timeout and restart select (). + return 0; + } + errno_assert (rc >= 0); #endif + break; + } - while (true) { - - // Process 0MQ commands if needed. - if (nsockets && FD_ISSET (notify_fd, &inset)) - if (!app_thread->process_commands (false, false)) { - errno = ETERM; - return -1; - } + // Check for the events. + for (int i = 0; i != nitems_; i++) { - // Check for the events. - for (int i = 0; i != nitems_; i++) { + items_ [i].revents = 0; - // If the poll item is a raw file descriptor, simply convert - // the events to zmq_pollitem_t-style format. - if (!items_ [i].socket) { - items_ [i].revents = 0; - if (FD_ISSET (items_ [i].fd, &inset)) - items_ [i].revents |= ZMQ_POLLIN; - if (FD_ISSET (items_ [i].fd, &outset)) + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_ [i].socket) { + size_t zmq_fd_size = sizeof (zmq::fd_t); + zmq::fd_t notify_fd; + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) == -1) + return -1; + if (FD_ISSET (notify_fd, &inset)) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) == -1) + return -1; + if ((items_ [i].events & ZMQ_POLLOUT) && + (zmq_events & ZMQ_POLLOUT)) items_ [i].revents |= ZMQ_POLLOUT; - if (FD_ISSET (items_ [i].fd, &errset)) - items_ [i].revents |= ZMQ_POLLERR; - if (items_ [i].revents) - nevents++; - continue; + if ((items_ [i].events & ZMQ_POLLIN) && + (zmq_events & ZMQ_POLLIN)) + items_ [i].revents |= ZMQ_POLLIN; } - - // The poll item is a 0MQ socket. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - items_ [i].revents = 0; - if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) - items_ [i].revents |= ZMQ_POLLOUT; - if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) + } + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + if (FD_ISSET (items_ [i].fd, &inset)) items_ [i].revents |= ZMQ_POLLIN; - if (items_ [i].revents) - nevents++; + if (FD_ISSET (items_ [i].fd, &outset)) + items_ [i].revents |= ZMQ_POLLOUT; + if (FD_ISSET (items_ [i].fd, &errset)) + items_ [i].revents |= ZMQ_POLLERR; } - // If there's at least one event, or if we are asked not to block, - // return immediately. - if (nevents || (timeout.tv_sec == 0 && timeout.tv_usec == 0)) - break; - - // Wait for events. Ignore interrupts if there's infinite timeout. - while (true) { - memcpy (&inset, &pollset_in, sizeof (fd_set)); - memcpy (&outset, &pollset_out, sizeof (fd_set)); - memcpy (&errset, &pollset_err, sizeof (fd_set)); - rc = select (maxfd, &inset, &outset, &errset, - block ? NULL : &timeout); -#if defined ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - if (rc == -1 && errno == EINTR) { - if (timeout_ < 0) - continue; - else { - rc = 0; - break; - } - } - errno_assert (rc >= 0); -#endif - break; - } - - // If timeout was hit with no events signaled, return zero. - if (rc == 0) - break; - - // If timeout was already applied, we don't want to poll anymore. - // Setting timeout to zero will cause termination of the function - // once the events we've got are processed. - if (!block) - timeout = zero_timeout; + if (items_ [i].revents) + nevents++; } return nevents; -- cgit v1.2.3