From d4fdc26efc6b21103e446f712a484af910a57f2f Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 10 Dec 2009 16:46:22 +0100 Subject: zmq_poll implemented on Win32 platform --- bindings/c/zmq.h | 7 ++++ src/zmq.cpp | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 125 insertions(+), 7 deletions(-) diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index ae8d6b1..f0d59b1 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -26,6 +26,9 @@ extern "C" { #include #include +#if defined _WIN32 +#include "winsock2.h" +#endif // Microsoft Visual Studio uses non-standard way to export/import symbols. #if defined ZMQ_BUILDING_LIBZMQ_WITH_MSVC @@ -185,7 +188,11 @@ ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags); typedef struct { void *socket; +#if defined _WIN32 + SOCKET fd; +#else int fd; +#endif short events; short revents; } zmq_pollitem_t; diff --git a/src/zmq.cpp b/src/zmq.cpp index 9b66be8..3497fe1 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -31,6 +31,7 @@ #include "platform.hpp" #include "stdint.hpp" #include "err.hpp" +#include "fd.hpp" #if defined ZMQ_HAVE_LINUX #include @@ -263,12 +264,10 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) int zmq_poll (zmq_pollitem_t *items_, int nitems_) { - // TODO: Replace the polling mechanism by the virtualised framework - // used in 0MQ I/O threads. That'll make the thing work on all platforms. -#if !defined ZMQ_HAVE_LINUX - errno = ENOTSUP; - return -1; -#else +#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ + defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ + defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); zmq_assert (pollfds); @@ -368,6 +367,119 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_) free (pollfds); return nevents; +#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + + fd_set pollset_in; + FD_ZERO (&pollset_in); + fd_set pollset_out; + FD_ZERO (&pollset_out); + fd_set pollset_err; + FD_ZERO (&pollset_err); + + zmq::app_thread_t *app_thread = NULL; + int nsockets = 0; + zmq::fd_t maxfd = zmq::retired_fd; + zmq::fd_t notify_fd = zmq::retired_fd; + + for (int i = 0; i != nitems_; i++) { + + // 0MQ sockets. + if (items_ [i].socket) { + + // Get the app_thread the socket is living in. If there are two + // sockets in the same pollset with different app threads, fail. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + if (app_thread) { + if (app_thread != s->get_thread ()) { + errno = EFAULT; + return -1; + } + } + else + app_thread = s->get_thread (); + + nsockets++; + continue; + } + + // Raw file descriptors. + if (items_ [i].events & ZMQ_POLLIN) + FD_SET (items_ [i].fd, &pollset_in); + if (items_ [i].events & ZMQ_POLLOUT) + FD_SET (items_ [i].fd, &pollset_out); + if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd) + maxfd = items_ [i].fd; + } + + // If there's at least one 0MQ socket in the pollset we have to poll + // for 0MQ commands. If ZMQ_POLL was not set, fail. + if (nsockets) { + notify_fd = app_thread->get_signaler ()->get_fd (); + if (notify_fd == zmq::retired_fd) { + errno = ENOTSUP; + return -1; + } + FD_SET (notify_fd, &pollset_in); + if (maxfd == zmq::retired_fd || maxfd < notify_fd) + maxfd = notify_fd; + } + + int nevents = 0; + bool initial = true; + while (!nevents) { + + // Wait for activity. In the first iteration just check for events, + // don't wait. Waiting would prevent exiting on any events that may + // already be signaled on 0MQ sockets. + timeval timeout = {0, 0}; + int rc = select (maxfd, &pollset_in, &pollset_out, &pollset_err, + initial ? &timeout : NULL); +#if defined ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + if (rc == -1 && errno == EINTR) + continue; +#endif + + errno_assert (rc >= 0); + initial = false; + + // Process 0MQ commands if needed. + if (nsockets && FD_ISSET (notify_fd, &pollset_in)) + app_thread->process_commands (false, false); + + // Check for the events. + int pollfd_pos = 0; + for (int i = 0; i != nitems_; i++) { + + // If the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + if (!items_ [i].socket) { + items_ [i].revents = + (FD_ISSET (items_ [i].fd, &pollset_in) ? ZMQ_POLLIN : 0) | + (FD_ISSET (items_ [i].fd, &pollset_out) ? ZMQ_POLLOUT : 0); + if (items_ [i].revents) + nevents++; + continue; + } + + // The poll item is a 0MQ socket. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + items_ [i].revents = 0; + if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) + items_ [i].revents |= ZMQ_POLLIN; + if (items_ [i].revents) + nevents++; + } + } + + return nevents; + +#else + errno = ENOTSUP; + return -1; #endif } @@ -428,4 +540,3 @@ unsigned long zmq_stopwatch_stop (void *watch_) free (watch_); return (unsigned long) (end - start); } - -- cgit v1.2.3