diff options
author | Martin Lucina <mato@kotelna.sk> | 2010-08-07 18:24:12 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-08-25 15:39:20 +0200 |
commit | cd12508418530724f10a353fc3a14ab321d0be8e (patch) | |
tree | 37a8c0620a698129ae2381782809ea30f58235ce /src | |
parent | eb7b8a413a99b2e43e8feee410f2b860e99e7056 (diff) |
zmq_poll(): Rewrite to use ZMQ_FD/ZMQ_EVENTS pt1
Rewrite zmq_poll() to use ZMQ_FD and ZMQ_EVENTS introduced on the
wip-shutdown branch. Only do the poll()-based version of zmq_poll (), the
select()-based version will not compile at the moment.
Diffstat (limited to 'src')
-rw-r--r-- | src/zmq.cpp | 169 |
1 files changed, 54 insertions, 115 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp index 6dc577e..d7ce546 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -364,7 +364,6 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) { -/* #if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ @@ -377,138 +376,81 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) } pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); zmq_assert (pollfds); - int npollfds = 0; - int nsockets = 0; - - zmq::app_thread_t *app_thread = NULL; + // Build pollset for poll () system call. for (int i = 0; i != nitems_; i++) { - // 0MQ sockets. + // If the poll item is a 0MQ socket, we poll on the file descriptor + // retrieved by the ZMQ_FD socket option. if (items_ [i].socket) { - - // Get the app_thread the socket is living in. If there are two - // sockets in the same pollset with different app threads, fail. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - if (app_thread) { - if (app_thread != s->get_thread ()) { - free (pollfds); - errno = EFAULT; - return -1; - } + size_t zmq_fd_size = sizeof (int); + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd, + &zmq_fd_size) == -1) { + free (pollfds); + return -1; } - else - app_thread = s->get_thread (); - - nsockets++; - continue; + pollfds [i].events = POLLIN; } - - // Raw file descriptors. - pollfds [npollfds].fd = items_ [i].fd; - pollfds [npollfds].events = - (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | - (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); - npollfds++; + // Else, the poll item is a raw file descriptor. Just convert the + // events to normal POLLIN/POLLOUT for poll (). + else { + pollfds [i].fd = items_ [i].fd; + pollfds [i].events = + (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | + (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); + } } - // If there's at least one 0MQ socket in the pollset we have to poll - // for 0MQ commands. If ZMQ_POLL was not set, fail. - if (nsockets) { - pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd (); - if (pollfds [npollfds].fd == zmq::retired_fd) { - free (pollfds); - errno = ENOTSUP; - return -1; - } - pollfds [npollfds].events = POLLIN; - npollfds++; - } - - // First iteration just check for events, don't block. Waiting would - // prevent exiting on any events that may already been signaled on - // 0MQ sockets. - int rc = poll (pollfds, npollfds, 0); - if (rc == -1 && errno == EINTR && timeout_ >= 0) { - free (pollfds); - return 0; - } - errno_assert (rc >= 0 || (rc == -1 && errno == EINTR)); - int timeout = timeout_ > 0 ? timeout_ / 1000 : -1; int nevents = 0; + // Wait for events. Ignore interrupts if there's infinite timeout. while (true) { - - // Process 0MQ commands if needed. - if (nsockets && pollfds [npollfds -1].revents & POLLIN) - if (!app_thread->process_commands (false, false)) { - free (pollfds); - errno = ETERM; - return -1; - } - - // Check for the events. - int pollfd_pos = 0; - for (int i = 0; i != nitems_; i++) { - - // If the poll item is a raw file descriptor, simply convert - // the events to zmq_pollitem_t-style format. - if (!items_ [i].socket) { - items_ [i].revents = 0; - if (pollfds [pollfd_pos].revents & POLLIN) - items_ [i].revents |= ZMQ_POLLIN; - if (pollfds [pollfd_pos].revents & POLLOUT) - items_ [i].revents |= ZMQ_POLLOUT; - if (pollfds [pollfd_pos].revents & ~(POLLIN | POLLOUT)) - items_ [i].revents |= ZMQ_POLLERR; - - if (items_ [i].revents) - nevents++; - pollfd_pos++; + int rc = poll (pollfds, nitems_, timeout); + if (rc == -1 && errno == EINTR) { + if (timeout_ < 0) continue; + // Interrupted, no way to determine how much time is remaining + // from timeout so just return for now. + else { + free (pollfds); + return 0; } - - // The poll item is a 0MQ socket. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - items_ [i].revents = 0; - if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) - items_ [i].revents |= ZMQ_POLLOUT; - if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) - items_ [i].revents |= ZMQ_POLLIN; - if (items_ [i].revents) - nevents++; } + errno_assert (rc >= 0); + break; + } - // If there's at least one event, or if we are asked not to block, - // return immediately. - if (nevents || !timeout_) - break; + // Check for the events. + for (int i = 0; i != nitems_; i++) { - // Wait for events. Ignore interrupts if there's infinite timeout. - while (true) { - rc = poll (pollfds, npollfds, timeout); - if (rc == -1 && errno == EINTR) { - if (timeout_ < 0) - continue; - else { - rc = 0; - break; + items_ [i].revents = 0; + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_ [i].socket) { + if (pollfds [i].revents & POLLIN) { + size_t zmq_events_size = sizeof (uint32_t); + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, + &items_ [i].revents, &zmq_events_size) == -1) { + free (pollfds); + return -1; } } - errno_assert (rc >= 0); - break; } - - // If timeout was hit with no events signaled, return zero. - if (rc == 0) - break; + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + if (pollfds [i].revents & POLLIN) + items_ [i].revents |= ZMQ_POLLIN; + if (pollfds [i].revents & POLLOUT) + items_ [i].revents |= ZMQ_POLLOUT; + if (pollfds [i].revents & ~(POLLIN | POLLOUT)) + items_ [i].revents |= ZMQ_POLLERR; + } - // If timeout was already applied, we don't want to poll anymore. - // Setting timeout to zero will cause termination of the function - // once the events we've got are processed. - if (timeout > 0) - timeout = 0; + if (items_ [i].revents) + nevents++; } free (pollfds); @@ -677,9 +619,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) errno = ENOTSUP; return -1; #endif -*/ -zmq_assert (false); -return -1; } int zmq_errno () |