diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/zmq.cpp | 125 |
1 files changed, 118 insertions, 7 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp index 9b66be8..3497fe1 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -31,6 +31,7 @@ #include "platform.hpp" #include "stdint.hpp" #include "err.hpp" +#include "fd.hpp" #if defined ZMQ_HAVE_LINUX #include <poll.h> @@ -263,12 +264,10 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) int zmq_poll (zmq_pollitem_t *items_, int nitems_) { - // TODO: Replace the polling mechanism by the virtualised framework - // used in 0MQ I/O threads. That'll make the thing work on all platforms. -#if !defined ZMQ_HAVE_LINUX - errno = ENOTSUP; - return -1; -#else +#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ + defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ + defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); zmq_assert (pollfds); @@ -368,6 +367,119 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_) free (pollfds); return nevents; +#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + + fd_set pollset_in; + FD_ZERO (&pollset_in); + fd_set pollset_out; + FD_ZERO (&pollset_out); + fd_set pollset_err; + FD_ZERO (&pollset_err); + + zmq::app_thread_t *app_thread = NULL; + int nsockets = 0; + zmq::fd_t maxfd = zmq::retired_fd; + zmq::fd_t notify_fd = zmq::retired_fd; + + for (int i = 0; i != nitems_; i++) { + + // 0MQ sockets. + if (items_ [i].socket) { + + // Get the app_thread the socket is living in. If there are two + // sockets in the same pollset with different app threads, fail. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + if (app_thread) { + if (app_thread != s->get_thread ()) { + errno = EFAULT; + return -1; + } + } + else + app_thread = s->get_thread (); + + nsockets++; + continue; + } + + // Raw file descriptors. + if (items_ [i].events & ZMQ_POLLIN) + FD_SET (items_ [i].fd, &pollset_in); + if (items_ [i].events & ZMQ_POLLOUT) + FD_SET (items_ [i].fd, &pollset_out); + if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd) + maxfd = items_ [i].fd; + } + + // If there's at least one 0MQ socket in the pollset we have to poll + // for 0MQ commands. If ZMQ_POLL was not set, fail. + if (nsockets) { + notify_fd = app_thread->get_signaler ()->get_fd (); + if (notify_fd == zmq::retired_fd) { + errno = ENOTSUP; + return -1; + } + FD_SET (notify_fd, &pollset_in); + if (maxfd == zmq::retired_fd || maxfd < notify_fd) + maxfd = notify_fd; + } + + int nevents = 0; + bool initial = true; + while (!nevents) { + + // Wait for activity. In the first iteration just check for events, + // don't wait. Waiting would prevent exiting on any events that may + // already be signaled on 0MQ sockets. + timeval timeout = {0, 0}; + int rc = select (maxfd, &pollset_in, &pollset_out, &pollset_err, + initial ? &timeout : NULL); +#if defined ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + if (rc == -1 && errno == EINTR) + continue; +#endif + + errno_assert (rc >= 0); + initial = false; + + // Process 0MQ commands if needed. + if (nsockets && FD_ISSET (notify_fd, &pollset_in)) + app_thread->process_commands (false, false); + + // Check for the events. + int pollfd_pos = 0; + for (int i = 0; i != nitems_; i++) { + + // If the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + if (!items_ [i].socket) { + items_ [i].revents = + (FD_ISSET (items_ [i].fd, &pollset_in) ? ZMQ_POLLIN : 0) | + (FD_ISSET (items_ [i].fd, &pollset_out) ? ZMQ_POLLOUT : 0); + if (items_ [i].revents) + nevents++; + continue; + } + + // The poll item is a 0MQ socket. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + items_ [i].revents = 0; + if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) + items_ [i].revents |= ZMQ_POLLIN; + if (items_ [i].revents) + nevents++; + } + } + + return nevents; + +#else + errno = ENOTSUP; + return -1; #endif } @@ -428,4 +540,3 @@ unsigned long zmq_stopwatch_stop (void *watch_) free (watch_); return (unsigned long) (end - start); } - |