diff options
Diffstat (limited to 'src/zmq.cpp')
-rw-r--r-- | src/zmq.cpp | 117 |
1 files changed, 116 insertions, 1 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp index 2dfdd48..21ac612 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -25,11 +25,16 @@ #include <new> #include "socket_base.hpp" -#include "err.hpp" +#include "app_thread.hpp" #include "dispatcher.hpp" #include "msg_content.hpp" #include "platform.hpp" #include "stdint.hpp" +#include "err.hpp" + +#if defined ZMQ_HAVE_LINUX +#include <poll.h> +#endif #if !defined ZMQ_HAVE_WINDOWS #include <unistd.h> @@ -246,6 +251,116 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); } +int zmq_poll (zmq_pollitem_t *items_, int nitems_) +{ + // TODO: Replace the polling mechanism by the virtualised framework + // used in 0MQ I/O threads. That'll make the thing work on all platforms. +#if !defined ZMQ_HAVE_LINUX + errno = ENOTSUP; + return -1; +#else + + pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); + zmq_assert (pollfds); + int npollfds = 0; + int nsockets = 0; + + zmq::app_thread_t *app_thread = NULL; + + for (int i = 0; i != nitems_; i++) { + + // 0MQ sockets. + if (items_ [i].socket) { + + // Get the app_thread the socket is living in. If there are two + // sockets in the same pollset with different app threads, fail. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + if (app_thread) { + if (app_thread != s->get_thread ()) { + free (pollfds); + errno = EFAULT; + return -1; + } + } + else + app_thread = s->get_thread (); + + nsockets++; + continue; + } + + // Raw file descriptors. + pollfds [npollfds].fd = items_ [i].fd; + pollfds [npollfds].events = + (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | + (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); + npollfds++; + } + + // If there's at least one 0MQ socket in the pollset we have to poll + // for 0MQ commands. If ZMQ_POLL was not set, fail. + if (nsockets) { + pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd (); + if (pollfds [npollfds].fd == zmq::retired_fd) { + free (pollfds); + errno = ENOTSUP; + return -1; + } + pollfds [npollfds].events = POLLIN; + npollfds++; + } + + int nevents = 0; + bool initial = true; + while (!nevents) { + + // Wait for activity. In the first iteration just check for events, + // don't wait. Waiting would prevent exiting on any events that may + // already be signaled on 0MQ sockets. + int rc = poll (pollfds, npollfds, initial ? 0 : -1); + if (rc == -1 && errno == EINTR) + continue; + errno_assert (rc >= 0); + initial = false; + + // Process 0MQ commands if needed. + if (nsockets && pollfds [npollfds -1].revents & POLLIN) + app_thread->process_commands (false, false); + + // Check for the events. + int pollfd_pos = 0; + for (int i = 0; i != nitems_; i++) { + + // If the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + if (!items_ [i].socket) { + items_ [i].revents = + (pollfds [pollfd_pos].revents & POLLIN ? ZMQ_POLLIN : 0) | + (pollfds [pollfd_pos].revents & POLLOUT ? ZMQ_POLLOUT : 0); + if (items_ [i].revents) + nevents++; + pollfd_pos++; + continue; + } + + // The poll item is a 0MQ socket. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + items_ [i].revents = 0; + if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) + items_ [i].revents |= ZMQ_POLLIN; + if (items_ [i].revents) + nevents++; + } + } + + free (pollfds); + return nevents; + +#endif +} + #if defined ZMQ_HAVE_WINDOWS static uint64_t now () |