diff options
| -rw-r--r-- | src/config.hpp | 4 | ||||
| -rw-r--r-- | src/devpoll.cpp | 35 | ||||
| -rw-r--r-- | src/devpoll.hpp | 6 | ||||
| -rw-r--r-- | src/epoll.cpp | 43 | ||||
| -rw-r--r-- | src/epoll.hpp | 6 | ||||
| -rw-r--r-- | src/kqueue.cpp | 40 | ||||
| -rw-r--r-- | src/kqueue.hpp | 6 | ||||
| -rw-r--r-- | src/poll.cpp | 33 | ||||
| -rw-r--r-- | src/poll.hpp | 6 | ||||
| -rw-r--r-- | src/poller_base.cpp | 52 | ||||
| -rw-r--r-- | src/poller_base.hpp | 27 | ||||
| -rw-r--r-- | src/select.cpp | 37 | ||||
| -rw-r--r-- | src/select.hpp | 6 | ||||
| -rw-r--r-- | src/zmq_connecter.cpp | 3 | ||||
| -rw-r--r-- | src/zmq_connecter.hpp | 2 | 
15 files changed, 113 insertions, 193 deletions
diff --git a/src/config.hpp b/src/config.hpp index bceb41c..57c2373 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -69,10 +69,6 @@ namespace zmq          //  Maximum number of events the I/O thread can process in one go.          max_io_events = 256, -//  TODO: To be removed -        //  Maximal wait time for a timer (milliseconds). -        max_timer_period = 100, -          //  How long to wait (milliseconds) till reattempting to connect.          reconnect_period = 100, diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 32aca50..75df105 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -124,18 +124,6 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_)      devpoll_ctl (handle_, fd_table [handle_].events);  } -void zmq::devpoll_t::add_timer (int timeout_, i_poll_events *events_, int id_) -{ -     timers.push_back (events_); -} - -void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_) -{ -    timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); -    if (it != timers.end ()) -        timers.erase (it); -} -  void zmq::devpoll_t::start ()  {      worker.start (worker_routine, this); @@ -161,31 +149,18 @@ void zmq::devpoll_t::loop ()              fd_table [pending_list [i]].accepted = true;          pending_list.clear (); -        poll_req.dp_fds = &ev_buf [0]; -        poll_req.dp_nfds = nfds; -        poll_req.dp_timeout = timers.empty () ? -1 : max_timer_period; +        //  Execute any due timers. +        uint64_t timeout = execute_timers ();          //  Wait for events. +        poll_req.dp_fds = &ev_buf [0]; +        poll_req.dp_nfds = nfds; +        poll_req.dp_timeout = timout ? timeout : -1;          int n = ioctl (devpoll_fd, DP_POLL, &poll_req);          if (n == -1 && errno == EINTR)              continue;          errno_assert (n != -1); -        //  Handle timer. -        if (!n) { - -            //  Use local list of timers as timer handlers may fill new timers -            //  into the original array. -            timers_t t; -            std::swap (timers, t); - -            //  Trigger all the timers. -            for (timers_t::iterator it = t.begin (); it != t.end (); it ++) -                (*it)->timer_event (-1); - -            continue; -        } -          for (int i = 0; i < n; i ++) {              fd_entry_t *fd_ptr = &fd_table [ev_buf [i].fd]; diff --git a/src/devpoll.hpp b/src/devpoll.hpp index ae16583..31a8068 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -51,8 +51,6 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (int timeout_, struct i_poll_events *events_, int id_); -        void cancel_timer (struct i_poll_events *events_, int id_);          void start ();          void stop (); @@ -83,10 +81,6 @@ namespace zmq          //  Pollset manipulation function.          void devpoll_ctl (fd_t fd_, short events_); -        //  List of all the engines waiting for the timer event. -        typedef std::vector <struct i_poll_events*> timers_t; -        timers_t timers; -          //  If true, thread is in the process of shutting down.          bool stopping; diff --git a/src/epoll.cpp b/src/epoll.cpp index 584a13f..ac0ffb1 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -117,19 +117,6 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)      errno_assert (rc != -1);  } -void zmq::epoll_t::add_timer (int timeout_, i_poll_events *events_, int id_) -{ -     timers.push_back (events_); -} - -void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_) -{ -    timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); -    if (it == timers.end ()) -        return; -    timers.erase (it); -} -  void zmq::epoll_t::start ()  {      worker.start (worker_routine, this); @@ -146,31 +133,15 @@ void zmq::epoll_t::loop ()      while (!stopping) { -        //  Wait for events. -        int n; -        while (true) { -            n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events, -                timers.empty () ? -1 : max_timer_period); -            if (!(n == -1 && errno == EINTR)) { -                errno_assert (n != -1); -                break; -            } -        } - -        //  Handle timer. -        if (!n) { - -            //  Use local list of timers as timer handlers may fill new timers -            //  into the original array. -            timers_t t; -            std::swap (timers, t); - -            //  Trigger all the timers. -            for (timers_t::iterator it = t.begin (); it != t.end (); it ++) -                (*it)->timer_event (-1); +        //  Execute any due timers. +        uint64_t timeout = execute_timers (); +        //  Wait for events. +        int n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events, +            timeout ? timeout : -1); +        if (n == -1 && errno == EINTR)              continue; -        } +        errno_assert (n != -1);          for (int i = 0; i < n; i ++) {              poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr); diff --git a/src/epoll.hpp b/src/epoll.hpp index a19fc0d..015e3d8 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -53,8 +53,6 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (int timeout_, struct i_poll_events *events_, int id_); -        void cancel_timer (struct i_poll_events *events_, int id_);          void start ();          void stop (); @@ -80,10 +78,6 @@ namespace zmq          typedef std::vector <poll_entry_t*> retired_t;          retired_t retired; -        //  List of all the engines waiting for the timer event. -        typedef std::vector <struct i_poll_events*> timers_t; -        timers_t timers; -          //  If true, thread is in the process of shutting down.          bool stopping; diff --git a/src/kqueue.cpp b/src/kqueue.cpp index 47178d3..965c5e9 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -132,18 +132,6 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)      kevent_delete (pe->fd, EVFILT_WRITE);  } -void zmq::kqueue_t::add_timer (int timeout_, i_poll_events *events_, int id_) -{ -     timers.push_back (events_); -} - -void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_) -{ -    timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); -    if (it != timers.end ()) -        timers.erase (it); -} -  void zmq::kqueue_t::start ()  {      worker.start (worker_routine, this); @@ -158,34 +146,18 @@ void zmq::kqueue_t::loop ()  {      while (!stopping) { -        struct kevent ev_buf [max_io_events]; - -        //  Compute time interval to wait. -        timespec timeout = {max_timer_period / 1000, -            (max_timer_period % 1000) * 1000000}; +        //  Execute any due timers. +        uint64_t timeout = execute_timers ();          //  Wait for events. -        int n = kevent (kqueue_fd, NULL, 0, -             &ev_buf [0], max_io_events, timers.empty () ? NULL : &timeout); +        struct kevent ev_buf [max_io_events]; +        timespec ts = {timeout / 1000, (timeout % 1000) * 1000000}; +        int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events, +            timeout ? &ts: NULL);          if (n == -1 && errno == EINTR)              continue;          errno_assert (n != -1); -        //  Handle timer. -        if (!n) { - -            //  Use local list of timers as timer handlers may fill new timers -            //  into the original array. -            timers_t t; -            std::swap (timers, t); - -            //  Trigger all the timers. -            for (timers_t::iterator it = t.begin (); it != t.end (); it ++) -                (*it)->timer_event (-1); - -            continue; -        } -          for (int i = 0; i < n; i ++) {              poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata; diff --git a/src/kqueue.hpp b/src/kqueue.hpp index 6a27260..47d6b74 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -53,8 +53,6 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (int timeout_, struct i_poll_events *events_, int id_); -        void cancel_timer (struct i_poll_events *events_, int id_);          void start ();          void stop (); @@ -87,10 +85,6 @@ namespace zmq          typedef std::vector <poll_entry_t*> retired_t;          retired_t retired; -        //  List of all the engines waiting for the timer event. -        typedef std::vector <struct i_poll_events*> timers_t; -        timers_t timers; -          //  If true, thread is in the process of shutting down.          bool stopping; diff --git a/src/poll.cpp b/src/poll.cpp index 9afa6b5..1cab5bb 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -109,18 +109,6 @@ void zmq::poll_t::reset_pollout (handle_t handle_)      pollset [index].events &= ~((short) POLLOUT);  } -void zmq::poll_t::add_timer (int timeout_, i_poll_events *events_, int id_) -{ -     timers.push_back (events_); -} - -void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_) -{ -    timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); -    if (it != timers.end ()) -        timers.erase (it); -} -  void zmq::poll_t::start ()  {      worker.start (worker_routine, this); @@ -135,27 +123,20 @@ void zmq::poll_t::loop ()  {      while (!stopping) { +        //  Execute any due timers. +        uint64_t timeout = execute_timers (); +          //  Wait for events. -        int rc = poll (&pollset [0], pollset.size (), -            timers.empty () ? -1 : max_timer_period); +        int rc = poll (&pollset [0], pollset.size (), timeout ? timeout : -1);          if (rc == -1 && errno == EINTR)              continue;          errno_assert (rc != -1); -        //  Handle timer. -        if (!rc) { - -            //  Use local list of timers as timer handlers may fill new timers -            //  into the original array. -            timers_t t; -            std::swap (timers, t); - -            //  Trigger all the timers. -            for (timers_t::iterator it = t.begin (); it != t.end (); it ++) -                (*it)->timer_event (-1); +        //  If there are no events (i.e. it's a timeout) there's no point +        //  in checking the pollset. +        if (rc == 0)              continue; -        }          for (pollset_t::size_type i = 0; i != pollset.size (); i++) { diff --git a/src/poll.hpp b/src/poll.hpp index e88c39d..07555b0 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -58,8 +58,6 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (int timeout_, struct i_poll_events *events_, int id_); -        void cancel_timer (struct i_poll_events *events_, int id_);          void start ();          void stop (); @@ -87,10 +85,6 @@ namespace zmq          //  If true, there's at least one retired event source.          bool retired; -        //  List of all the engines waiting for the timer event. -        typedef std::vector <struct i_poll_events*> timers_t; -        timers_t timers; -          //  If true, thread is in the process of shutting down.          bool stopping; diff --git a/src/poller_base.cpp b/src/poller_base.cpp index f1de7e9..d55692a 100644 --- a/src/poller_base.cpp +++ b/src/poller_base.cpp @@ -18,6 +18,7 @@  */  #include "poller_base.hpp" +#include "i_poll_events.hpp"  #include "err.hpp"  zmq::poller_base_t::poller_base_t () @@ -26,7 +27,7 @@ zmq::poller_base_t::poller_base_t ()  zmq::poller_base_t::~poller_base_t ()  { -    //  Make sure there are no fds registered on shutdown. +    //  Make sure there is no more load on the shutdown.      zmq_assert (get_load () == 0);  } @@ -42,3 +43,52 @@ void zmq::poller_base_t::adjust_load (int amount_)      else if (amount_ < 0)          load.sub (-amount_);  } + +void zmq::poller_base_t::add_timer (int timeout_, i_poll_events *sink_, int id_) +{ +    uint64_t expiration = clock.now_ms () + timeout_; +    timer_info_t info = {sink_, id_}; +    timers.insert (std::make_pair (expiration, info)); +} + +void zmq::poller_base_t::cancel_timer (i_poll_events *sink_, int id_) +{ +    //  Complexity of this operation is O(n). We assume it is rarely used. +    for (timers_t::iterator it = timers.begin (); it != timers.end (); it++) +        if (it->second.sink == sink_ && it->second.id == id_) { +            timers.erase (it); +            return; +        } + +    //  Timer not found. +    zmq_assert (false); +} + +uint64_t zmq::poller_base_t::execute_timers () +{ +    //  Get the current time. +    uint64_t current = clock.now_ms (); + +    //   Execute the timers that are already due. +    timers_t::iterator it = timers.begin (); +    while (it != timers.end ()) { + +        //  If we have to wait to execute the item, same will be true about +        //  all the following items (multimap is sorted). Thus we can stop +        //  checking the subsequent timers and return the time to wait for +        //  the next timer (at least 1ms). +        if (it->first > current) +            return it->first - current; + +        //  Trigger the timer. +        it->second.sink->timer_event (it->second.id); + +        //  Remove it from the list of active timers. +        timers_t::iterator o = it; +        ++it; +        timers.erase (o); +    } + +    //  There are no more timers. +    return 0; +} diff --git a/src/poller_base.hpp b/src/poller_base.hpp index 0b0d53d..f607d94 100644 --- a/src/poller_base.hpp +++ b/src/poller_base.hpp @@ -20,6 +20,9 @@  #ifndef __ZMQ_POLLER_BASE_HPP_INCLUDED__  #define __ZMQ_POLLER_BASE_HPP_INCLUDED__ +#include <map> + +#include "clock.hpp"  #include "atomic_counter.hpp"  namespace zmq @@ -36,13 +39,37 @@ namespace zmq          //  invoked from a different thread!          int get_load (); +        //  Add a timeout to expire in timeout_ milliseconds. After the +        //  expiration timer_event on sink_ object will be called with +        //  argument set to id_. +        void add_timer (int timeout_, struct i_poll_events *sink_, int id_); + +        //  Cancel the timer created by sink_ object with ID equal to id_. +        void cancel_timer (struct i_poll_events *sink_, int id_); +      protected:          //  Called by individual poller implementations to manage the load.          void adjust_load (int amount_); +        //  Executes any timers that are due. Returns number of milliseconds +        //  to wait to match the next timer or 0 meaning "no timers". +        uint64_t execute_timers (); +      private: +        //  Clock instance private to this I/O thread. +        clock_t clock; + +        //  List of active timers. +        struct timer_info_t +        { +            struct i_poll_events *sink; +            int id; +        }; +        typedef std::multimap <uint64_t, timer_info_t> timers_t; +        timers_t timers; +          //  Load of the poller. Currently the number of file descriptors          //  registered.          atomic_counter_t load; diff --git a/src/select.cpp b/src/select.cpp index a04ee66..f6e5133 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -133,18 +133,6 @@ void zmq::select_t::reset_pollout (handle_t handle_)      FD_CLR (handle_, &source_set_out);  } -void zmq::select_t::add_timer (int timeout_, i_poll_events *events_, int id_) -{ -    timers.push_back (events_); -} - -void zmq::select_t::cancel_timer (i_poll_events *events_, int id_) -{ -    timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); -    if (it != timers.end ()) -        timers.erase (it); -} -  void zmq::select_t::start ()  {      worker.start (worker_routine, this); @@ -164,14 +152,13 @@ void zmq::select_t::loop ()          memcpy (&writefds, &source_set_out, sizeof source_set_out);          memcpy (&exceptfds, &source_set_err, sizeof source_set_err); -        //  Compute the timout interval. Select is free to overwrite the -        //  value so we have to compute it each time anew. -        timeval timeout = {max_timer_period / 1000, -            (max_timer_period % 1000) * 1000}; +        //  Execute any due timers. +        uint64_t timeout = execute_timers ();          //  Wait for events. +        struct timeval tv = {timeout / 1000, timeout % 1000 * 1000};          int rc = select (maxfd + 1, &readfds, &writefds, &exceptfds, -            timers.empty () ? NULL : &timeout); +            timeout ? &tv : NULL);  #ifdef ZMQ_HAVE_WINDOWS          wsa_assert (rc != SOCKET_ERROR); @@ -181,20 +168,10 @@ void zmq::select_t::loop ()          errno_assert (rc != -1);  #endif -        //  Handle timer. -        if (!rc) { - -            //  Use local list of timers as timer handlers may fill new timers -            //  into the original array. -            timers_t t; -            std::swap (timers, t); - -            //  Trigger all the timers. -            for (timers_t::iterator it = t.begin (); it != t.end (); it ++) -                (*it)->timer_event (-1); - +        //  If there are no events (i.e. it's a timeout) there's no point +        //  in checking the pollset. +        if (rc == 0)              continue; -        }          for (fd_set_t::size_type i = 0; i < fds.size (); i ++) {              if (fds [i].fd == retired_fd) diff --git a/src/select.hpp b/src/select.hpp index c6c7f51..121857e 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -60,8 +60,6 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (int timeout_, struct i_poll_events *events_, int id_); -        void cancel_timer (struct i_poll_events *events_, int id_);          void start ();          void stop (); @@ -98,10 +96,6 @@ namespace zmq          //  If true, at least one file descriptor has retired.          bool retired; -        //  List of all the engines waiting for the timer event. -        typedef std::vector <struct i_poll_events*> timers_t; -        timers_t timers; -          //  If true, thread is shutting down.          bool stopping; diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 7cb58d3..1e83529 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -92,8 +92,9 @@ void zmq::zmq_connecter_t::out_event ()      terminate ();  } -void zmq::zmq_connecter_t::timer_event () +void zmq::zmq_connecter_t::timer_event (int id_)  { +    zmq_assert (id_ == reconnect_timer_id);      wait = false;      start_connecting ();  } diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index 381e020..16bfc31 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -51,7 +51,7 @@ namespace zmq          //  Handlers for I/O events.          void in_event ();          void out_event (); -        void timer_event (); +        void timer_event (int id_);          //  Internal function to start the actual connection establishment.          void start_connecting ();  | 
