From cc631c4c6649b0d67114db13386a949426e35dbf Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 1 Oct 2009 10:56:17 +0200 Subject: ZMQII-18: Implement I/O multiplexing (first approximation) --- src/zmq.cpp | 117 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 116 insertions(+), 1 deletion(-) (limited to 'src/zmq.cpp') diff --git a/src/zmq.cpp b/src/zmq.cpp index 2dfdd48..21ac612 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -25,11 +25,16 @@ #include #include "socket_base.hpp" -#include "err.hpp" +#include "app_thread.hpp" #include "dispatcher.hpp" #include "msg_content.hpp" #include "platform.hpp" #include "stdint.hpp" +#include "err.hpp" + +#if defined ZMQ_HAVE_LINUX +#include +#endif #if !defined ZMQ_HAVE_WINDOWS #include @@ -246,6 +251,116 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); } +int zmq_poll (zmq_pollitem_t *items_, int nitems_) +{ + // TODO: Replace the polling mechanism by the virtualised framework + // used in 0MQ I/O threads. That'll make the thing work on all platforms. +#if !defined ZMQ_HAVE_LINUX + errno = ENOTSUP; + return -1; +#else + + pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); + zmq_assert (pollfds); + int npollfds = 0; + int nsockets = 0; + + zmq::app_thread_t *app_thread = NULL; + + for (int i = 0; i != nitems_; i++) { + + // 0MQ sockets. + if (items_ [i].socket) { + + // Get the app_thread the socket is living in. If there are two + // sockets in the same pollset with different app threads, fail. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + if (app_thread) { + if (app_thread != s->get_thread ()) { + free (pollfds); + errno = EFAULT; + return -1; + } + } + else + app_thread = s->get_thread (); + + nsockets++; + continue; + } + + // Raw file descriptors. + pollfds [npollfds].fd = items_ [i].fd; + pollfds [npollfds].events = + (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | + (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); + npollfds++; + } + + // If there's at least one 0MQ socket in the pollset we have to poll + // for 0MQ commands. If ZMQ_POLL was not set, fail. + if (nsockets) { + pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd (); + if (pollfds [npollfds].fd == zmq::retired_fd) { + free (pollfds); + errno = ENOTSUP; + return -1; + } + pollfds [npollfds].events = POLLIN; + npollfds++; + } + + int nevents = 0; + bool initial = true; + while (!nevents) { + + // Wait for activity. In the first iteration just check for events, + // don't wait. Waiting would prevent exiting on any events that may + // already be signaled on 0MQ sockets. + int rc = poll (pollfds, npollfds, initial ? 0 : -1); + if (rc == -1 && errno == EINTR) + continue; + errno_assert (rc >= 0); + initial = false; + + // Process 0MQ commands if needed. + if (nsockets && pollfds [npollfds -1].revents & POLLIN) + app_thread->process_commands (false, false); + + // Check for the events. + int pollfd_pos = 0; + for (int i = 0; i != nitems_; i++) { + + // If the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + if (!items_ [i].socket) { + items_ [i].revents = + (pollfds [pollfd_pos].revents & POLLIN ? ZMQ_POLLIN : 0) | + (pollfds [pollfd_pos].revents & POLLOUT ? ZMQ_POLLOUT : 0); + if (items_ [i].revents) + nevents++; + pollfd_pos++; + continue; + } + + // The poll item is a 0MQ socket. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + items_ [i].revents = 0; + if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) + items_ [i].revents |= ZMQ_POLLIN; + if (items_ [i].revents) + nevents++; + } + } + + free (pollfds); + return nevents; + +#endif +} + #if defined ZMQ_HAVE_WINDOWS static uint64_t now () -- cgit v1.2.3 From 49a9ef5fcb661827ee174415b4608e609bd0a65b Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 1 Oct 2009 13:48:04 +0200 Subject: windows error handling improved --- src/zmq.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'src/zmq.cpp') diff --git a/src/zmq.cpp b/src/zmq.cpp index 21ac612..7952b61 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -49,6 +49,14 @@ const char *zmq_strerror (int errnum_) return "Not supported"; case EPROTONOSUPPORT: return "Protocol not supported"; + case ENOBUFS: + return "No buffer space available"; + case ENETDOWN: + return "Network is down"; + case EADDRINUSE: + return "Address in use"; + case EADDRNOTAVAIL: + return "Address not available"; #endif case EMTHREAD: return "Number of preallocated application threads exceeded"; -- cgit v1.2.3