summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zmq.cpp125
1 files changed, 118 insertions, 7 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 9b66be8..3497fe1 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -31,6 +31,7 @@
#include "platform.hpp"
#include "stdint.hpp"
#include "err.hpp"
+#include "fd.hpp"
#if defined ZMQ_HAVE_LINUX
#include <poll.h>
@@ -263,12 +264,10 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
int zmq_poll (zmq_pollitem_t *items_, int nitems_)
{
- // TODO: Replace the polling mechanism by the virtualised framework
- // used in 0MQ I/O threads. That'll make the thing work on all platforms.
-#if !defined ZMQ_HAVE_LINUX
- errno = ENOTSUP;
- return -1;
-#else
+#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
+ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
+ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
+ defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
@@ -368,6 +367,119 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
free (pollfds);
return nevents;
+#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+
+ fd_set pollset_in;
+ FD_ZERO (&pollset_in);
+ fd_set pollset_out;
+ FD_ZERO (&pollset_out);
+ fd_set pollset_err;
+ FD_ZERO (&pollset_err);
+
+ zmq::app_thread_t *app_thread = NULL;
+ int nsockets = 0;
+ zmq::fd_t maxfd = zmq::retired_fd;
+ zmq::fd_t notify_fd = zmq::retired_fd;
+
+ for (int i = 0; i != nitems_; i++) {
+
+ // 0MQ sockets.
+ if (items_ [i].socket) {
+
+ // Get the app_thread the socket is living in. If there are two
+ // sockets in the same pollset with different app threads, fail.
+ zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
+ if (app_thread) {
+ if (app_thread != s->get_thread ()) {
+ errno = EFAULT;
+ return -1;
+ }
+ }
+ else
+ app_thread = s->get_thread ();
+
+ nsockets++;
+ continue;
+ }
+
+ // Raw file descriptors.
+ if (items_ [i].events & ZMQ_POLLIN)
+ FD_SET (items_ [i].fd, &pollset_in);
+ if (items_ [i].events & ZMQ_POLLOUT)
+ FD_SET (items_ [i].fd, &pollset_out);
+ if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd)
+ maxfd = items_ [i].fd;
+ }
+
+ // If there's at least one 0MQ socket in the pollset we have to poll
+ // for 0MQ commands. If ZMQ_POLL was not set, fail.
+ if (nsockets) {
+ notify_fd = app_thread->get_signaler ()->get_fd ();
+ if (notify_fd == zmq::retired_fd) {
+ errno = ENOTSUP;
+ return -1;
+ }
+ FD_SET (notify_fd, &pollset_in);
+ if (maxfd == zmq::retired_fd || maxfd < notify_fd)
+ maxfd = notify_fd;
+ }
+
+ int nevents = 0;
+ bool initial = true;
+ while (!nevents) {
+
+ // Wait for activity. In the first iteration just check for events,
+ // don't wait. Waiting would prevent exiting on any events that may
+ // already be signaled on 0MQ sockets.
+ timeval timeout = {0, 0};
+ int rc = select (maxfd, &pollset_in, &pollset_out, &pollset_err,
+ initial ? &timeout : NULL);
+#if defined ZMQ_HAVE_WINDOWS
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ if (rc == -1 && errno == EINTR)
+ continue;
+#endif
+
+ errno_assert (rc >= 0);
+ initial = false;
+
+ // Process 0MQ commands if needed.
+ if (nsockets && FD_ISSET (notify_fd, &pollset_in))
+ app_thread->process_commands (false, false);
+
+ // Check for the events.
+ int pollfd_pos = 0;
+ for (int i = 0; i != nitems_; i++) {
+
+ // If the poll item is a raw file descriptor, simply convert
+ // the events to zmq_pollitem_t-style format.
+ if (!items_ [i].socket) {
+ items_ [i].revents =
+ (FD_ISSET (items_ [i].fd, &pollset_in) ? ZMQ_POLLIN : 0) |
+ (FD_ISSET (items_ [i].fd, &pollset_out) ? ZMQ_POLLOUT : 0);
+ if (items_ [i].revents)
+ nevents++;
+ continue;
+ }
+
+ // The poll item is a 0MQ socket.
+ zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
+ items_ [i].revents = 0;
+ if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
+ items_ [i].revents |= ZMQ_POLLIN;
+ if (items_ [i].revents)
+ nevents++;
+ }
+ }
+
+ return nevents;
+
+#else
+ errno = ENOTSUP;
+ return -1;
#endif
}
@@ -428,4 +540,3 @@ unsigned long zmq_stopwatch_stop (void *watch_)
free (watch_);
return (unsigned long) (end - start);
}
-