diff options
| -rw-r--r-- | bindings/c/zmq.h | 33 | ||||
| -rw-r--r-- | bindings/cpp/zmq.hpp | 5 | ||||
| -rw-r--r-- | src/fd_signaler.hpp | 2 | ||||
| -rw-r--r-- | src/i_signaler.hpp | 6 | ||||
| -rw-r--r-- | src/p2p.cpp | 11 | ||||
| -rw-r--r-- | src/p2p.hpp | 2 | ||||
| -rw-r--r-- | src/pub.cpp | 11 | ||||
| -rw-r--r-- | src/pub.hpp | 2 | ||||
| -rw-r--r-- | src/rep.cpp | 17 | ||||
| -rw-r--r-- | src/rep.hpp | 2 | ||||
| -rw-r--r-- | src/req.cpp | 13 | ||||
| -rw-r--r-- | src/req.hpp | 2 | ||||
| -rw-r--r-- | src/socket_base.cpp | 15 | ||||
| -rw-r--r-- | src/socket_base.hpp | 12 | ||||
| -rw-r--r-- | src/sub.cpp | 13 | ||||
| -rw-r--r-- | src/sub.hpp | 2 | ||||
| -rw-r--r-- | src/ypollset.cpp | 6 | ||||
| -rw-r--r-- | src/ypollset.hpp | 1 | ||||
| -rw-r--r-- | src/zmq.cpp | 117 | 
19 files changed, 269 insertions, 3 deletions
| diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index e939dcd..3eeb3f8 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -354,6 +354,39 @@ ZMQ_EXPORT int zmq_flush (void *s);  ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);  //////////////////////////////////////////////////////////////////////////////// +//  I/O multiplexing. +//////////////////////////////////////////////////////////////////////////////// + +#define ZMQ_POLLIN 1 +#define ZMQ_POLLOUT 2 + +//  'socket' is a 0MQ socket we want to poll on. If set to NULL, native file +//  descriptor (socket) 'fd' will be used instead. 'events' defines event we +//  are going to poll on - combination of ZMQ_POLLIN and ZMQ_POLLOUT. Error +//  event does not exist for portability reasons. Errors from native sockets +//  are reported as ZMQ_POLLIN. It's client's responsibilty to identify the +//  error afterwards. 'revents' field is filled in after function returns. It's +//  a combination of ZMQ_POLLIN and/or ZMQ_POLLOUT depending on the state of the +//  socket. +typedef struct +{ +    void *socket; +    int fd; +    short events; +    short revents; +} zmq_pollitem_t; + +//  Polls for the items specified by 'items'. Number of items in the array is +//  determined by 'nitems' argument. Returns number of items signaled, -1 +//  in the case of error. +// +//  Errors: EFAULT - there's a 0MQ socket in the pollset belonging to +//                   a different thread. +//          ENOTSUP - 0MQ context was initialised without ZMQ_POLL flag. +//                    I/O multiplexing is disabled. +ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems); + +////////////////////////////////////////////////////////////////////////////////  //  Helper functions.  //////////////////////////////////////////////////////////////////////////////// diff --git a/bindings/cpp/zmq.hpp b/bindings/cpp/zmq.hpp index 5c0ba7c..8a00230 100644 --- a/bindings/cpp/zmq.hpp +++ b/bindings/cpp/zmq.hpp @@ -200,6 +200,11 @@ namespace zmq                  throw error_t ();          } +        inline operator void* () +        { +            return ptr; +        } +          inline void setsockopt (int option_, const void *optval_,              size_t optvallen_)          { diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index e1b56ad..513a1e4 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -44,8 +44,6 @@ namespace zmq          void signal (int signal_);          uint64_t poll ();          uint64_t check (); - -        //  Get the file descriptor associated with the object.          fd_t get_fd ();      private: diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp index a09fe7e..ad04bb5 100644 --- a/src/i_signaler.hpp +++ b/src/i_signaler.hpp @@ -21,6 +21,7 @@  #define __ZMQ_I_SIGNALER_HPP_INCLUDED__  #include "stdint.hpp" +#include "fd.hpp"  namespace zmq  { @@ -42,6 +43,11 @@ namespace zmq          //  Same as poll, however, if there is no signal available,          //  function returns zero immediately instead of waiting for a signal.          virtual uint64_t check () = 0; + +        //  Returns file descriptor that allows waiting for signals. Specific +        //  signalers may not support this functionality. If so, the function +        //  returns retired_fd. +        virtual fd_t get_fd () = 0;      };  } diff --git a/src/p2p.cpp b/src/p2p.cpp index c43b7b4..445ba5b 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -84,4 +84,15 @@ int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_)      return 0;  } +bool zmq::p2p_t::xhas_in () +{ +    zmq_assert (false); +    return false; +} + +bool zmq::p2p_t::xhas_out () +{ +    zmq_assert (false); +    return false; +} diff --git a/src/p2p.hpp b/src/p2p.hpp index 1c98dd5..1fd7e34 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -42,6 +42,8 @@ namespace zmq          int xsend (zmq_msg_t *msg_, int flags_);          int xflush ();          int xrecv (zmq_msg_t *msg_, int flags_); +        bool xhas_in (); +        bool xhas_out ();      private: diff --git a/src/pub.cpp b/src/pub.cpp index 1e66a18..63b235e 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -156,3 +156,14 @@ int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)      return -1;  } +bool zmq::pub_t::xhas_in () +{ +    return false; +} + +bool zmq::pub_t::xhas_out () +{ +    //  TODO: Reimplement when queue limits are added. +    return true; +} + diff --git a/src/pub.hpp b/src/pub.hpp index 07eb5a1..b3e868d 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -43,6 +43,8 @@ namespace zmq          int xsend (zmq_msg_t *msg_, int flags_);          int xflush ();          int xrecv (zmq_msg_t *msg_, int flags_); +        bool xhas_in (); +        bool xhas_out ();      private: diff --git a/src/rep.cpp b/src/rep.cpp index 137c735..e8a9e39 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -195,4 +195,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)      return -1;  } +bool zmq::rep_t::xhas_in () +{ +    for (int count = active; count != 0; count--) { +        if (in_pipes [current]->check_read ()) +            return !waiting_for_reply; +        current++; +        if (current >= active) +            current = 0; +    } + +    return false; +} + +bool zmq::rep_t::xhas_out () +{ +    return waiting_for_reply; +} diff --git a/src/rep.hpp b/src/rep.hpp index 4781213..3e87dc1 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -43,6 +43,8 @@ namespace zmq          int xsend (zmq_msg_t *msg_, int flags_);          int xflush ();          int xrecv (zmq_msg_t *msg_, int flags_); +        bool xhas_in (); +        bool xhas_out ();      private: diff --git a/src/req.cpp b/src/req.cpp index 63409ce..93a46e8 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -195,4 +195,17 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)      return 0;  } +bool zmq::req_t::xhas_in () +{ +    if (reply_pipe->check_read ()) +        return waiting_for_reply; + +    return false; +} + +bool zmq::req_t::xhas_out () +{ +    return !waiting_for_reply; +} + diff --git a/src/req.hpp b/src/req.hpp index 3ae78fd..86554b5 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -43,6 +43,8 @@ namespace zmq          int xsend (zmq_msg_t *msg_, int flags_);          int xflush ();          int xrecv (zmq_msg_t *msg_, int flags_); +        bool xhas_in (); +        bool xhas_out ();      private: diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6763167..6583608 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -364,6 +364,21 @@ int zmq::socket_base_t::close ()      return 0;  } +zmq::app_thread_t *zmq::socket_base_t::get_thread () +{ +    return app_thread; +} + +bool zmq::socket_base_t::has_in () +{ +    return xhas_in (); +} + +bool zmq::socket_base_t::has_out () +{ +    return xhas_out (); +} +  bool zmq::socket_base_t::register_session (const char *name_,      session_t *session_)  { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index c54efae..49ff5a5 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -54,6 +54,16 @@ namespace zmq          int recv (zmq_msg_t *msg_, int flags_);          int close (); +        //  This function is used by the polling mechanism to determine +        //  whether the socket belongs to the application thread the poll +        //  is called from. +        class app_thread_t *get_thread (); + +        //  These functions are used by the polling mechanism to determine +        //  which events are to be reported from this socket. +        bool has_in (); +        bool has_out (); +          //  The list of sessions cannot be accessed via inter-thread          //  commands as it is unacceptable to wait for the completion of the          //  action till user application yields control of the application @@ -88,6 +98,8 @@ namespace zmq          virtual int xsend (zmq_msg_t *msg_, int options_) = 0;          virtual int xflush () = 0;          virtual int xrecv (zmq_msg_t *msg_, int options_) = 0; +        virtual bool xhas_in () = 0; +        virtual bool xhas_out () = 0;          //  Socket options.          options_t options; diff --git a/src/sub.cpp b/src/sub.cpp index 1bfbcd5..a7f9783 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -197,3 +197,16 @@ int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)      errno = EAGAIN;      return -1;  } + +bool zmq::sub_t::xhas_in () +{ +    //  TODO:  This is more complex as we have to ignore all the messages that +    //         don't fit the filter. +    zmq_assert (false); +    return false; +} + +bool zmq::sub_t::xhas_out () +{ +    return false; +} diff --git a/src/sub.hpp b/src/sub.hpp index 8691928..fb881dc 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -48,6 +48,8 @@ namespace zmq          int xsend (zmq_msg_t *msg_, int flags_);          int xflush ();          int xrecv (zmq_msg_t *msg_, int flags_); +        bool xhas_in (); +        bool xhas_out ();      private: diff --git a/src/ypollset.cpp b/src/ypollset.cpp index a72eac7..4e73361 100644 --- a/src/ypollset.cpp +++ b/src/ypollset.cpp @@ -29,6 +29,7 @@ zmq::ypollset_t::~ypollset_t ()  void zmq::ypollset_t::signal (int signal_)  { +printf ("++signal\n");      zmq_assert (signal_ >= 0 && signal_ < wait_signal);      if (bits.btsr (signal_, wait_signal))          sem.post ();  @@ -58,3 +59,8 @@ uint64_t zmq::ypollset_t::check ()  {      return (uint64_t) bits.xchg (0);  } + +zmq::fd_t zmq::ypollset_t::get_fd () +{ +    return retired_fd; +} diff --git a/src/ypollset.hpp b/src/ypollset.hpp index 25eb3e0..810a638 100644 --- a/src/ypollset.hpp +++ b/src/ypollset.hpp @@ -42,6 +42,7 @@ namespace zmq          void signal (int signal_);          uint64_t poll ();          uint64_t check (); +        fd_t get_fd ();      private: diff --git a/src/zmq.cpp b/src/zmq.cpp index 2dfdd48..21ac612 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -25,11 +25,16 @@  #include <new>  #include "socket_base.hpp" -#include "err.hpp" +#include "app_thread.hpp"  #include "dispatcher.hpp"  #include "msg_content.hpp"  #include "platform.hpp"  #include "stdint.hpp" +#include "err.hpp" + +#if defined ZMQ_HAVE_LINUX +#include <poll.h> +#endif  #if !defined ZMQ_HAVE_WINDOWS  #include <unistd.h> @@ -246,6 +251,116 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)      return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));  } +int zmq_poll (zmq_pollitem_t *items_, int nitems_) +{ +    //  TODO: Replace the polling mechanism by the virtualised framework +    //  used in 0MQ I/O threads. That'll make the thing work on all platforms. +#if !defined ZMQ_HAVE_LINUX +    errno = ENOTSUP; +    return -1; +#else + +    pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); +    zmq_assert (pollfds); +    int npollfds = 0; +    int nsockets = 0; + +    zmq::app_thread_t *app_thread = NULL; + +    for (int i = 0; i != nitems_; i++) { + +        //  0MQ sockets. +        if (items_ [i].socket) { + +            //  Get the app_thread the socket is living in. If there are two +            //  sockets in the same pollset with different app threads, fail. +            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; +            if (app_thread) { +                if (app_thread != s->get_thread ()) { +                    free (pollfds); +                    errno = EFAULT; +                    return -1; +                } +            } +            else +                app_thread = s->get_thread (); + +            nsockets++; +            continue; +        } + +        //  Raw file descriptors. +        pollfds [npollfds].fd = items_ [i].fd; +        pollfds [npollfds].events = +            (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | +            (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); +        npollfds++; +    } + +    //  If there's at least one 0MQ socket in the pollset we have to poll +    //  for 0MQ commands. If ZMQ_POLL was not set, fail. +    if (nsockets) { +        pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd (); +        if (pollfds [npollfds].fd == zmq::retired_fd) { +            free (pollfds); +            errno = ENOTSUP; +            return -1; +        } +        pollfds [npollfds].events = POLLIN; +        npollfds++; +    } + +    int nevents = 0; +    bool initial = true; +    while (!nevents) { + +        //  Wait for activity. In the first iteration just check for events, +        //  don't wait. Waiting would prevent exiting on any events that may +        //  already be signaled on 0MQ sockets. +        int rc = poll (pollfds, npollfds, initial ? 0 : -1); +        if (rc == -1 && errno == EINTR) +            continue; +        errno_assert (rc >= 0); +        initial = false; + +        //  Process 0MQ commands if needed. +        if (nsockets && pollfds [npollfds -1].revents & POLLIN) +            app_thread->process_commands (false, false); + +        //  Check for the events. +        int pollfd_pos = 0; +        for (int i = 0; i != nitems_; i++) { + +            //  If the poll item is a raw file descriptor, simply convert +            //  the events to zmq_pollitem_t-style format. +            if (!items_ [i].socket) { +                items_ [i].revents = +                    (pollfds [pollfd_pos].revents & POLLIN ? ZMQ_POLLIN : 0) | +                    (pollfds [pollfd_pos].revents & POLLOUT ? ZMQ_POLLOUT : 0); +                if (items_ [i].revents) +                    nevents++; +                pollfd_pos++; +                continue; +            } + +            //  The poll item is a 0MQ socket. +            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; +            items_ [i].revents = 0; +            if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) +                items_ [i].revents |= ZMQ_POLLOUT; +            if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) +                items_ [i].revents |= ZMQ_POLLIN; +            if (items_ [i].revents) +                nevents++; +        } +    } + +    free (pollfds); +    return nevents; + +#endif +} +  #if defined ZMQ_HAVE_WINDOWS  static uint64_t now () | 
