diff options
Diffstat (limited to 'src/zmq.cpp')
-rw-r--r-- | src/zmq.cpp | 362 |
1 files changed, 148 insertions, 214 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp index f3ccaac..e93f8b7 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -29,7 +29,6 @@ #include "queue.hpp" #include "streamer.hpp" #include "socket_base.hpp" -#include "app_thread.hpp" #include "msg_content.hpp" #include "platform.hpp" #include "stdint.hpp" @@ -83,14 +82,14 @@ const char *zmq_strerror (int errnum_) case EINPROGRESS: return "Operation in progress"; #endif - case EMTHREAD: - return "Number of preallocated application threads exceeded"; case EFSM: return "Operation cannot be accomplished in current state"; case ENOCOMPATPROTO: return "The protocol is not compatible with the socket type"; case ETERM: return "Context was terminated"; + case EMTHREAD: + return "No thread available"; default: #if defined _MSC_VER #pragma warning (push) @@ -277,7 +276,7 @@ int zmq_term (void *ctx_) return -1; } - int rc = ((zmq::ctx_t*) ctx_)->term (); + int rc = ((zmq::ctx_t*) ctx_)->terminate (); int en = errno; #if defined ZMQ_HAVE_OPENPGM @@ -378,140 +377,104 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) errno = EFAULT; return -1; } + pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); zmq_assert (pollfds); - int npollfds = 0; - int nsockets = 0; - - zmq::app_thread_t *app_thread = NULL; + // Build pollset for poll () system call. for (int i = 0; i != nitems_; i++) { - // 0MQ sockets. + // If the poll item is a 0MQ socket, we poll on the file descriptor + // retrieved by the ZMQ_FD socket option. if (items_ [i].socket) { - - // Get the app_thread the socket is living in. If there are two - // sockets in the same pollset with different app threads, fail. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - if (app_thread) { - if (app_thread != s->get_thread ()) { - free (pollfds); - errno = EFAULT; - return -1; - } + size_t zmq_fd_size = sizeof (zmq::fd_t); + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd, + &zmq_fd_size) == -1) { + free (pollfds); + return -1; } - else - app_thread = s->get_thread (); - - nsockets++; - continue; + pollfds [i].events = items_ [i].events ? POLLIN : 0; } - - // Raw file descriptors. - pollfds [npollfds].fd = items_ [i].fd; - pollfds [npollfds].events = - (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | - (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); - npollfds++; - } - - // If there's at least one 0MQ socket in the pollset we have to poll - // for 0MQ commands. If ZMQ_POLL was not set, fail. - if (nsockets) { - pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd (); - if (pollfds [npollfds].fd == zmq::retired_fd) { - free (pollfds); - errno = ENOTSUP; - return -1; + // Else, the poll item is a raw file descriptor. Just convert the + // events to normal POLLIN/POLLOUT for poll (). + else { + pollfds [i].fd = items_ [i].fd; + pollfds [i].events = + (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | + (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); } - pollfds [npollfds].events = POLLIN; - npollfds++; } - // First iteration just check for events, don't block. Waiting would - // prevent exiting on any events that may already been signaled on - // 0MQ sockets. - int rc = poll (pollfds, npollfds, 0); - if (rc == -1 && errno == EINTR && timeout_ >= 0) { - free (pollfds); - return 0; - } - errno_assert (rc >= 0 || (rc == -1 && errno == EINTR)); - + bool first_pass = true; int timeout = timeout_ > 0 ? timeout_ / 1000 : -1; int nevents = 0; while (true) { - // Process 0MQ commands if needed. - if (nsockets && pollfds [npollfds -1].revents & POLLIN) - if (!app_thread->process_commands (false, false)) { + // Wait for events. + while (true) { + int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout); + if (rc == -1 && errno == EINTR) { free (pollfds); - errno = ETERM; return -1; } + errno_assert (rc >= 0); + break; + } // Check for the events. - int pollfd_pos = 0; for (int i = 0; i != nitems_; i++) { - // If the poll item is a raw file descriptor, simply convert + items_ [i].revents = 0; + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_ [i].socket) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) == -1) { + free (pollfds); + return -1; + } + if ((items_ [i].events & ZMQ_POLLOUT) && + (zmq_events & ZMQ_POLLOUT)) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && + (zmq_events & ZMQ_POLLIN)) + items_ [i].revents |= ZMQ_POLLIN; + } + // Else, the poll item is a raw file descriptor, simply convert // the events to zmq_pollitem_t-style format. - if (!items_ [i].socket) { - items_ [i].revents = 0; - if (pollfds [pollfd_pos].revents & POLLIN) + else { + if (pollfds [i].revents & POLLIN) items_ [i].revents |= ZMQ_POLLIN; - if (pollfds [pollfd_pos].revents & POLLOUT) + if (pollfds [i].revents & POLLOUT) items_ [i].revents |= ZMQ_POLLOUT; - if (pollfds [pollfd_pos].revents & ~(POLLIN | POLLOUT)) + if (pollfds [i].revents & ~(POLLIN | POLLOUT)) items_ [i].revents |= ZMQ_POLLERR; - - if (items_ [i].revents) - nevents++; - pollfd_pos++; - continue; } - // The poll item is a 0MQ socket. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - items_ [i].revents = 0; - if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) - items_ [i].revents |= ZMQ_POLLOUT; - if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) - items_ [i].revents |= ZMQ_POLLIN; if (items_ [i].revents) nevents++; } - // If there's at least one event, or if we are asked not to block, - // return immediately. - if (nevents || !timeout_) - break; - - // Wait for events. Ignore interrupts if there's infinite timeout. - while (true) { - rc = poll (pollfds, npollfds, timeout); - if (rc == -1 && errno == EINTR) { - if (timeout_ < 0) - continue; - else { - rc = 0; - break; - } - } - errno_assert (rc >= 0); - break; + // If there are no events from the first pass (the one with no + // timout), do at least the second pass so that we wait. + if (first_pass && nevents == 0 && timeout_ != 0) { + first_pass = false; + continue; } - - // If timeout was hit with no events signaled, return zero. - if (rc == 0) - break; - // If timeout was already applied, we don't want to poll anymore. - // Setting timeout to zero will cause termination of the function - // once the events we've got are processed. - if (timeout > 0) - timeout = 0; + // If timeout is set to infinite and we have to events to return + // we can restart the polling. + if (timeout == -1 && nevents == 0) + continue; + + // TODO: if nevents is zero recompute timeout and loop + // if it is not yet reached. + + break; } free (pollfds); @@ -526,161 +489,132 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) fd_set pollset_err; FD_ZERO (&pollset_err); - zmq::app_thread_t *app_thread = NULL; - int nsockets = 0; - zmq::fd_t maxfd = zmq::retired_fd; - zmq::fd_t notify_fd = zmq::retired_fd; + zmq::fd_t maxfd = 0; // Ensure we do not attempt to select () on more than FD_SETSIZE // file descriptors. zmq_assert (nitems_ <= FD_SETSIZE); + // Build the fd_sets for passing to select (). for (int i = 0; i != nitems_; i++) { - // 0MQ sockets. + // If the poll item is a 0MQ socket we are interested in input on the + // notification file descriptor retrieved by the ZMQ_FD socket option. if (items_ [i].socket) { - - // Get the app_thread the socket is living in. If there are two - // sockets in the same pollset with different app threads, fail. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - if (app_thread) { - if (app_thread != s->get_thread ()) { - errno = EFAULT; - return -1; - } + size_t zmq_fd_size = sizeof (zmq::fd_t); + zmq::fd_t notify_fd; + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) == -1) + return -1; + if (items_ [i].events) { + FD_SET (notify_fd, &pollset_in); + if (maxfd < notify_fd) + maxfd = notify_fd; } - else - app_thread = s->get_thread (); - - nsockets++; - continue; } - - // Raw file descriptors. - if (items_ [i].events & ZMQ_POLLIN) - FD_SET (items_ [i].fd, &pollset_in); - if (items_ [i].events & ZMQ_POLLOUT) - FD_SET (items_ [i].fd, &pollset_out); - if (items_ [i].events & ZMQ_POLLERR) - FD_SET (items_ [i].fd, &pollset_err); - if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd) - maxfd = items_ [i].fd; - } - - // If there's at least one 0MQ socket in the pollset we have to poll - // for 0MQ commands. If ZMQ_POLL was not set, fail. - if (nsockets) { - notify_fd = app_thread->get_signaler ()->get_fd (); - if (notify_fd == zmq::retired_fd) { - errno = ENOTSUP; - return -1; + // Else, the poll item is a raw file descriptor. Convert the poll item + // events to the appropriate fd_sets. + else { + if (items_ [i].events & ZMQ_POLLIN) + FD_SET (items_ [i].fd, &pollset_in); + if (items_ [i].events & ZMQ_POLLOUT) + FD_SET (items_ [i].fd, &pollset_out); + if (items_ [i].events & ZMQ_POLLERR) + FD_SET (items_ [i].fd, &pollset_err); + if (maxfd < items_ [i].fd) + maxfd = items_ [i].fd; } - FD_SET (notify_fd, &pollset_in); - if (maxfd == zmq::retired_fd || maxfd < notify_fd) - maxfd = notify_fd; } - bool block = (timeout_ < 0); - timeval timeout = {timeout_ / 1000000, timeout_ % 1000000}; + bool first_pass = true; timeval zero_timeout = {0, 0}; + timeval timeout = {timeout_ / 1000000, timeout_ % 1000000}; int nevents = 0; - - // First iteration just check for events, don't block. Waiting would - // prevent exiting on any events that may already been signaled on - // 0MQ sockets. fd_set inset, outset, errset; - memcpy (&inset, &pollset_in, sizeof (fd_set)); - memcpy (&outset, &pollset_out, sizeof (fd_set)); - memcpy (&errset, &pollset_err, sizeof (fd_set)); - int rc = select (maxfd, &inset, &outset, &errset, &zero_timeout); -#if defined ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - if (rc == -1 && errno == EINTR && timeout_ >= 0) - return 0; - errno_assert (rc >= 0 || (rc == -1 && errno == EINTR)); -#endif while (true) { - // Process 0MQ commands if needed. - if (nsockets && FD_ISSET (notify_fd, &inset)) - if (!app_thread->process_commands (false, false)) { - errno = ETERM; + // Wait for events. Ignore interrupts if there's infinite timeout. + while (true) { + memcpy (&inset, &pollset_in, sizeof (fd_set)); + memcpy (&outset, &pollset_out, sizeof (fd_set)); + memcpy (&errset, &pollset_err, sizeof (fd_set)); + int rc = select (maxfd, &inset, &outset, &errset, + first_pass ? &zero_timeout : (timeout_ < 0 ? NULL : &timeout)); +#if defined ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + if (rc == -1 && errno == EINTR) return -1; - } + errno_assert (rc >= 0); +#endif + break; + } // Check for the events. for (int i = 0; i != nitems_; i++) { - // If the poll item is a raw file descriptor, simply convert + items_ [i].revents = 0; + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_ [i].socket) { + size_t zmq_fd_size = sizeof (zmq::fd_t); + zmq::fd_t notify_fd; + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) == -1) + return -1; + if (FD_ISSET (notify_fd, &inset)) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) == -1) + return -1; + if ((items_ [i].events & ZMQ_POLLOUT) && + (zmq_events & ZMQ_POLLOUT)) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && + (zmq_events & ZMQ_POLLIN)) + items_ [i].revents |= ZMQ_POLLIN; + } + } + // Else, the poll item is a raw file descriptor, simply convert // the events to zmq_pollitem_t-style format. - if (!items_ [i].socket) { - items_ [i].revents = 0; + else { if (FD_ISSET (items_ [i].fd, &inset)) items_ [i].revents |= ZMQ_POLLIN; if (FD_ISSET (items_ [i].fd, &outset)) items_ [i].revents |= ZMQ_POLLOUT; if (FD_ISSET (items_ [i].fd, &errset)) items_ [i].revents |= ZMQ_POLLERR; - if (items_ [i].revents) - nevents++; - continue; } - // The poll item is a 0MQ socket. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - items_ [i].revents = 0; - if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) - items_ [i].revents |= ZMQ_POLLOUT; - if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) - items_ [i].revents |= ZMQ_POLLIN; if (items_ [i].revents) nevents++; } - // If there's at least one event, or if we are asked not to block, - // return immediately. - if (nevents || (timeout.tv_sec == 0 && timeout.tv_usec == 0)) - break; - - // Wait for events. Ignore interrupts if there's infinite timeout. - while (true) { - memcpy (&inset, &pollset_in, sizeof (fd_set)); - memcpy (&outset, &pollset_out, sizeof (fd_set)); - memcpy (&errset, &pollset_err, sizeof (fd_set)); - rc = select (maxfd, &inset, &outset, &errset, - block ? NULL : &timeout); -#if defined ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - if (rc == -1 && errno == EINTR) { - if (timeout_ < 0) - continue; - else { - rc = 0; - break; - } - } - errno_assert (rc >= 0); -#endif - break; + // If there are no events from the first pass (the one with no + // timout), do at least the second pass so that we wait. + if (first_pass && nevents == 0 && timeout_ != 0) { + first_pass = false; + continue; } - - // If timeout was hit with no events signaled, return zero. - if (rc == 0) - break; - // If timeout was already applied, we don't want to poll anymore. - // Setting timeout to zero will cause termination of the function - // once the events we've got are processed. - if (!block) - timeout = zero_timeout; + // If timeout is set to infinite and we have to events to return + // we can restart the polling. + if (timeout_ < 0 && nevents == 0) + continue; + + // TODO: if nevents is zero recompute timeout and loop + // if it is not yet reached. + + break; } return nevents; #else + // Exotic platforms that support neither poll() nor select(). errno = ENOTSUP; return -1; #endif |