From cc631c4c6649b0d67114db13386a949426e35dbf Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 1 Oct 2009 10:56:17 +0200 Subject: ZMQII-18: Implement I/O multiplexing (first approximation) --- src/fd_signaler.hpp | 2 - src/i_signaler.hpp | 6 +++ src/p2p.cpp | 11 +++++ src/p2p.hpp | 2 + src/pub.cpp | 11 +++++ src/pub.hpp | 2 + src/rep.cpp | 17 ++++++++ src/rep.hpp | 2 + src/req.cpp | 13 ++++++ src/req.hpp | 2 + src/socket_base.cpp | 15 +++++++ src/socket_base.hpp | 12 ++++++ src/sub.cpp | 13 ++++++ src/sub.hpp | 2 + src/ypollset.cpp | 6 +++ src/ypollset.hpp | 1 + src/zmq.cpp | 117 +++++++++++++++++++++++++++++++++++++++++++++++++++- 17 files changed, 231 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index e1b56ad..513a1e4 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -44,8 +44,6 @@ namespace zmq void signal (int signal_); uint64_t poll (); uint64_t check (); - - // Get the file descriptor associated with the object. fd_t get_fd (); private: diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp index a09fe7e..ad04bb5 100644 --- a/src/i_signaler.hpp +++ b/src/i_signaler.hpp @@ -21,6 +21,7 @@ #define __ZMQ_I_SIGNALER_HPP_INCLUDED__ #include "stdint.hpp" +#include "fd.hpp" namespace zmq { @@ -42,6 +43,11 @@ namespace zmq // Same as poll, however, if there is no signal available, // function returns zero immediately instead of waiting for a signal. virtual uint64_t check () = 0; + + // Returns file descriptor that allows waiting for signals. Specific + // signalers may not support this functionality. If so, the function + // returns retired_fd. + virtual fd_t get_fd () = 0; }; } diff --git a/src/p2p.cpp b/src/p2p.cpp index c43b7b4..445ba5b 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -84,4 +84,15 @@ int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_) return 0; } +bool zmq::p2p_t::xhas_in () +{ + zmq_assert (false); + return false; +} + +bool zmq::p2p_t::xhas_out () +{ + zmq_assert (false); + return false; +} diff --git a/src/p2p.hpp b/src/p2p.hpp index 1c98dd5..1fd7e34 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -42,6 +42,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/pub.cpp b/src/pub.cpp index 1e66a18..63b235e 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -156,3 +156,14 @@ int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_) return -1; } +bool zmq::pub_t::xhas_in () +{ + return false; +} + +bool zmq::pub_t::xhas_out () +{ + // TODO: Reimplement when queue limits are added. + return true; +} + diff --git a/src/pub.hpp b/src/pub.hpp index 07eb5a1..b3e868d 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -43,6 +43,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/rep.cpp b/src/rep.cpp index 137c735..e8a9e39 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -195,4 +195,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) return -1; } +bool zmq::rep_t::xhas_in () +{ + for (int count = active; count != 0; count--) { + if (in_pipes [current]->check_read ()) + return !waiting_for_reply; + current++; + if (current >= active) + current = 0; + } + + return false; +} + +bool zmq::rep_t::xhas_out () +{ + return waiting_for_reply; +} diff --git a/src/rep.hpp b/src/rep.hpp index 4781213..3e87dc1 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -43,6 +43,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/req.cpp b/src/req.cpp index 63409ce..93a46e8 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -195,4 +195,17 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) return 0; } +bool zmq::req_t::xhas_in () +{ + if (reply_pipe->check_read ()) + return waiting_for_reply; + + return false; +} + +bool zmq::req_t::xhas_out () +{ + return !waiting_for_reply; +} + diff --git a/src/req.hpp b/src/req.hpp index 3ae78fd..86554b5 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -43,6 +43,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6763167..6583608 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -364,6 +364,21 @@ int zmq::socket_base_t::close () return 0; } +zmq::app_thread_t *zmq::socket_base_t::get_thread () +{ + return app_thread; +} + +bool zmq::socket_base_t::has_in () +{ + return xhas_in (); +} + +bool zmq::socket_base_t::has_out () +{ + return xhas_out (); +} + bool zmq::socket_base_t::register_session (const char *name_, session_t *session_) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index c54efae..49ff5a5 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -54,6 +54,16 @@ namespace zmq int recv (zmq_msg_t *msg_, int flags_); int close (); + // This function is used by the polling mechanism to determine + // whether the socket belongs to the application thread the poll + // is called from. + class app_thread_t *get_thread (); + + // These functions are used by the polling mechanism to determine + // which events are to be reported from this socket. + bool has_in (); + bool has_out (); + // The list of sessions cannot be accessed via inter-thread // commands as it is unacceptable to wait for the completion of the // action till user application yields control of the application @@ -88,6 +98,8 @@ namespace zmq virtual int xsend (zmq_msg_t *msg_, int options_) = 0; virtual int xflush () = 0; virtual int xrecv (zmq_msg_t *msg_, int options_) = 0; + virtual bool xhas_in () = 0; + virtual bool xhas_out () = 0; // Socket options. options_t options; diff --git a/src/sub.cpp b/src/sub.cpp index 1bfbcd5..a7f9783 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -197,3 +197,16 @@ int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_) errno = EAGAIN; return -1; } + +bool zmq::sub_t::xhas_in () +{ + // TODO: This is more complex as we have to ignore all the messages that + // don't fit the filter. + zmq_assert (false); + return false; +} + +bool zmq::sub_t::xhas_out () +{ + return false; +} diff --git a/src/sub.hpp b/src/sub.hpp index 8691928..fb881dc 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -48,6 +48,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/ypollset.cpp b/src/ypollset.cpp index a72eac7..4e73361 100644 --- a/src/ypollset.cpp +++ b/src/ypollset.cpp @@ -29,6 +29,7 @@ zmq::ypollset_t::~ypollset_t () void zmq::ypollset_t::signal (int signal_) { +printf ("++signal\n"); zmq_assert (signal_ >= 0 && signal_ < wait_signal); if (bits.btsr (signal_, wait_signal)) sem.post (); @@ -58,3 +59,8 @@ uint64_t zmq::ypollset_t::check () { return (uint64_t) bits.xchg (0); } + +zmq::fd_t zmq::ypollset_t::get_fd () +{ + return retired_fd; +} diff --git a/src/ypollset.hpp b/src/ypollset.hpp index 25eb3e0..810a638 100644 --- a/src/ypollset.hpp +++ b/src/ypollset.hpp @@ -42,6 +42,7 @@ namespace zmq void signal (int signal_); uint64_t poll (); uint64_t check (); + fd_t get_fd (); private: diff --git a/src/zmq.cpp b/src/zmq.cpp index 2dfdd48..21ac612 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -25,11 +25,16 @@ #include #include "socket_base.hpp" -#include "err.hpp" +#include "app_thread.hpp" #include "dispatcher.hpp" #include "msg_content.hpp" #include "platform.hpp" #include "stdint.hpp" +#include "err.hpp" + +#if defined ZMQ_HAVE_LINUX +#include +#endif #if !defined ZMQ_HAVE_WINDOWS #include @@ -246,6 +251,116 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); } +int zmq_poll (zmq_pollitem_t *items_, int nitems_) +{ + // TODO: Replace the polling mechanism by the virtualised framework + // used in 0MQ I/O threads. That'll make the thing work on all platforms. +#if !defined ZMQ_HAVE_LINUX + errno = ENOTSUP; + return -1; +#else + + pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); + zmq_assert (pollfds); + int npollfds = 0; + int nsockets = 0; + + zmq::app_thread_t *app_thread = NULL; + + for (int i = 0; i != nitems_; i++) { + + // 0MQ sockets. + if (items_ [i].socket) { + + // Get the app_thread the socket is living in. If there are two + // sockets in the same pollset with different app threads, fail. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + if (app_thread) { + if (app_thread != s->get_thread ()) { + free (pollfds); + errno = EFAULT; + return -1; + } + } + else + app_thread = s->get_thread (); + + nsockets++; + continue; + } + + // Raw file descriptors. + pollfds [npollfds].fd = items_ [i].fd; + pollfds [npollfds].events = + (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | + (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); + npollfds++; + } + + // If there's at least one 0MQ socket in the pollset we have to poll + // for 0MQ commands. If ZMQ_POLL was not set, fail. + if (nsockets) { + pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd (); + if (pollfds [npollfds].fd == zmq::retired_fd) { + free (pollfds); + errno = ENOTSUP; + return -1; + } + pollfds [npollfds].events = POLLIN; + npollfds++; + } + + int nevents = 0; + bool initial = true; + while (!nevents) { + + // Wait for activity. In the first iteration just check for events, + // don't wait. Waiting would prevent exiting on any events that may + // already be signaled on 0MQ sockets. + int rc = poll (pollfds, npollfds, initial ? 0 : -1); + if (rc == -1 && errno == EINTR) + continue; + errno_assert (rc >= 0); + initial = false; + + // Process 0MQ commands if needed. + if (nsockets && pollfds [npollfds -1].revents & POLLIN) + app_thread->process_commands (false, false); + + // Check for the events. + int pollfd_pos = 0; + for (int i = 0; i != nitems_; i++) { + + // If the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + if (!items_ [i].socket) { + items_ [i].revents = + (pollfds [pollfd_pos].revents & POLLIN ? ZMQ_POLLIN : 0) | + (pollfds [pollfd_pos].revents & POLLOUT ? ZMQ_POLLOUT : 0); + if (items_ [i].revents) + nevents++; + pollfd_pos++; + continue; + } + + // The poll item is a 0MQ socket. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + items_ [i].revents = 0; + if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) + items_ [i].revents |= ZMQ_POLLIN; + if (items_ [i].revents) + nevents++; + } + } + + free (pollfds); + return nevents; + +#endif +} + #if defined ZMQ_HAVE_WINDOWS static uint64_t now () -- cgit v1.2.3