From 238640a526c419392bf2df95de196db89ea6eb73 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 26 Sep 2010 21:42:23 +0200 Subject: timers properly implemented --- src/config.hpp | 4 ---- src/devpoll.cpp | 35 +++++----------------------------- src/devpoll.hpp | 6 ------ src/epoll.cpp | 43 +++++++----------------------------------- src/epoll.hpp | 6 ------ src/kqueue.cpp | 40 ++++++--------------------------------- src/kqueue.hpp | 6 ------ src/poll.cpp | 33 +++++++------------------------- src/poll.hpp | 6 ------ src/poller_base.cpp | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/poller_base.hpp | 27 ++++++++++++++++++++++++++ src/select.cpp | 37 +++++++----------------------------- src/select.hpp | 6 ------ src/zmq_connecter.cpp | 3 ++- src/zmq_connecter.hpp | 2 +- 15 files changed, 113 insertions(+), 193 deletions(-) (limited to 'src') diff --git a/src/config.hpp b/src/config.hpp index bceb41c..57c2373 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -69,10 +69,6 @@ namespace zmq // Maximum number of events the I/O thread can process in one go. max_io_events = 256, -// TODO: To be removed - // Maximal wait time for a timer (milliseconds). - max_timer_period = 100, - // How long to wait (milliseconds) till reattempting to connect. reconnect_period = 100, diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 32aca50..75df105 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -124,18 +124,6 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_) devpoll_ctl (handle_, fd_table [handle_].events); } -void zmq::devpoll_t::add_timer (int timeout_, i_poll_events *events_, int id_) -{ - timers.push_back (events_); -} - -void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_) -{ - timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); - if (it != timers.end ()) - timers.erase (it); -} - void zmq::devpoll_t::start () { worker.start (worker_routine, this); @@ -161,31 +149,18 @@ void zmq::devpoll_t::loop () fd_table [pending_list [i]].accepted = true; pending_list.clear (); - poll_req.dp_fds = &ev_buf [0]; - poll_req.dp_nfds = nfds; - poll_req.dp_timeout = timers.empty () ? -1 : max_timer_period; + // Execute any due timers. + uint64_t timeout = execute_timers (); // Wait for events. + poll_req.dp_fds = &ev_buf [0]; + poll_req.dp_nfds = nfds; + poll_req.dp_timeout = timout ? timeout : -1; int n = ioctl (devpoll_fd, DP_POLL, &poll_req); if (n == -1 && errno == EINTR) continue; errno_assert (n != -1); - // Handle timer. - if (!n) { - - // Use local list of timers as timer handlers may fill new timers - // into the original array. - timers_t t; - std::swap (timers, t); - - // Trigger all the timers. - for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (-1); - - continue; - } - for (int i = 0; i < n; i ++) { fd_entry_t *fd_ptr = &fd_table [ev_buf [i].fd]; diff --git a/src/devpoll.hpp b/src/devpoll.hpp index ae16583..31a8068 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -51,8 +51,6 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (int timeout_, struct i_poll_events *events_, int id_); - void cancel_timer (struct i_poll_events *events_, int id_); void start (); void stop (); @@ -83,10 +81,6 @@ namespace zmq // Pollset manipulation function. void devpoll_ctl (fd_t fd_, short events_); - // List of all the engines waiting for the timer event. - typedef std::vector timers_t; - timers_t timers; - // If true, thread is in the process of shutting down. bool stopping; diff --git a/src/epoll.cpp b/src/epoll.cpp index 584a13f..ac0ffb1 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -117,19 +117,6 @@ void zmq::epoll_t::reset_pollout (handle_t handle_) errno_assert (rc != -1); } -void zmq::epoll_t::add_timer (int timeout_, i_poll_events *events_, int id_) -{ - timers.push_back (events_); -} - -void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_) -{ - timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); - if (it == timers.end ()) - return; - timers.erase (it); -} - void zmq::epoll_t::start () { worker.start (worker_routine, this); @@ -146,31 +133,15 @@ void zmq::epoll_t::loop () while (!stopping) { - // Wait for events. - int n; - while (true) { - n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events, - timers.empty () ? -1 : max_timer_period); - if (!(n == -1 && errno == EINTR)) { - errno_assert (n != -1); - break; - } - } - - // Handle timer. - if (!n) { - - // Use local list of timers as timer handlers may fill new timers - // into the original array. - timers_t t; - std::swap (timers, t); - - // Trigger all the timers. - for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (-1); + // Execute any due timers. + uint64_t timeout = execute_timers (); + // Wait for events. + int n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events, + timeout ? timeout : -1); + if (n == -1 && errno == EINTR) continue; - } + errno_assert (n != -1); for (int i = 0; i < n; i ++) { poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr); diff --git a/src/epoll.hpp b/src/epoll.hpp index a19fc0d..015e3d8 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -53,8 +53,6 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (int timeout_, struct i_poll_events *events_, int id_); - void cancel_timer (struct i_poll_events *events_, int id_); void start (); void stop (); @@ -80,10 +78,6 @@ namespace zmq typedef std::vector retired_t; retired_t retired; - // List of all the engines waiting for the timer event. - typedef std::vector timers_t; - timers_t timers; - // If true, thread is in the process of shutting down. bool stopping; diff --git a/src/kqueue.cpp b/src/kqueue.cpp index 47178d3..965c5e9 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -132,18 +132,6 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_) kevent_delete (pe->fd, EVFILT_WRITE); } -void zmq::kqueue_t::add_timer (int timeout_, i_poll_events *events_, int id_) -{ - timers.push_back (events_); -} - -void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_) -{ - timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); - if (it != timers.end ()) - timers.erase (it); -} - void zmq::kqueue_t::start () { worker.start (worker_routine, this); @@ -158,34 +146,18 @@ void zmq::kqueue_t::loop () { while (!stopping) { - struct kevent ev_buf [max_io_events]; - - // Compute time interval to wait. - timespec timeout = {max_timer_period / 1000, - (max_timer_period % 1000) * 1000000}; + // Execute any due timers. + uint64_t timeout = execute_timers (); // Wait for events. - int n = kevent (kqueue_fd, NULL, 0, - &ev_buf [0], max_io_events, timers.empty () ? NULL : &timeout); + struct kevent ev_buf [max_io_events]; + timespec ts = {timeout / 1000, (timeout % 1000) * 1000000}; + int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events, + timeout ? &ts: NULL); if (n == -1 && errno == EINTR) continue; errno_assert (n != -1); - // Handle timer. - if (!n) { - - // Use local list of timers as timer handlers may fill new timers - // into the original array. - timers_t t; - std::swap (timers, t); - - // Trigger all the timers. - for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (-1); - - continue; - } - for (int i = 0; i < n; i ++) { poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata; diff --git a/src/kqueue.hpp b/src/kqueue.hpp index 6a27260..47d6b74 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -53,8 +53,6 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (int timeout_, struct i_poll_events *events_, int id_); - void cancel_timer (struct i_poll_events *events_, int id_); void start (); void stop (); @@ -87,10 +85,6 @@ namespace zmq typedef std::vector retired_t; retired_t retired; - // List of all the engines waiting for the timer event. - typedef std::vector timers_t; - timers_t timers; - // If true, thread is in the process of shutting down. bool stopping; diff --git a/src/poll.cpp b/src/poll.cpp index 9afa6b5..1cab5bb 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -109,18 +109,6 @@ void zmq::poll_t::reset_pollout (handle_t handle_) pollset [index].events &= ~((short) POLLOUT); } -void zmq::poll_t::add_timer (int timeout_, i_poll_events *events_, int id_) -{ - timers.push_back (events_); -} - -void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_) -{ - timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); - if (it != timers.end ()) - timers.erase (it); -} - void zmq::poll_t::start () { worker.start (worker_routine, this); @@ -135,27 +123,20 @@ void zmq::poll_t::loop () { while (!stopping) { + // Execute any due timers. + uint64_t timeout = execute_timers (); + // Wait for events. - int rc = poll (&pollset [0], pollset.size (), - timers.empty () ? -1 : max_timer_period); + int rc = poll (&pollset [0], pollset.size (), timeout ? timeout : -1); if (rc == -1 && errno == EINTR) continue; errno_assert (rc != -1); - // Handle timer. - if (!rc) { - - // Use local list of timers as timer handlers may fill new timers - // into the original array. - timers_t t; - std::swap (timers, t); - - // Trigger all the timers. - for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (-1); + // If there are no events (i.e. it's a timeout) there's no point + // in checking the pollset. + if (rc == 0) continue; - } for (pollset_t::size_type i = 0; i != pollset.size (); i++) { diff --git a/src/poll.hpp b/src/poll.hpp index e88c39d..07555b0 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -58,8 +58,6 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (int timeout_, struct i_poll_events *events_, int id_); - void cancel_timer (struct i_poll_events *events_, int id_); void start (); void stop (); @@ -87,10 +85,6 @@ namespace zmq // If true, there's at least one retired event source. bool retired; - // List of all the engines waiting for the timer event. - typedef std::vector timers_t; - timers_t timers; - // If true, thread is in the process of shutting down. bool stopping; diff --git a/src/poller_base.cpp b/src/poller_base.cpp index f1de7e9..d55692a 100644 --- a/src/poller_base.cpp +++ b/src/poller_base.cpp @@ -18,6 +18,7 @@ */ #include "poller_base.hpp" +#include "i_poll_events.hpp" #include "err.hpp" zmq::poller_base_t::poller_base_t () @@ -26,7 +27,7 @@ zmq::poller_base_t::poller_base_t () zmq::poller_base_t::~poller_base_t () { - // Make sure there are no fds registered on shutdown. + // Make sure there is no more load on the shutdown. zmq_assert (get_load () == 0); } @@ -42,3 +43,52 @@ void zmq::poller_base_t::adjust_load (int amount_) else if (amount_ < 0) load.sub (-amount_); } + +void zmq::poller_base_t::add_timer (int timeout_, i_poll_events *sink_, int id_) +{ + uint64_t expiration = clock.now_ms () + timeout_; + timer_info_t info = {sink_, id_}; + timers.insert (std::make_pair (expiration, info)); +} + +void zmq::poller_base_t::cancel_timer (i_poll_events *sink_, int id_) +{ + // Complexity of this operation is O(n). We assume it is rarely used. + for (timers_t::iterator it = timers.begin (); it != timers.end (); it++) + if (it->second.sink == sink_ && it->second.id == id_) { + timers.erase (it); + return; + } + + // Timer not found. + zmq_assert (false); +} + +uint64_t zmq::poller_base_t::execute_timers () +{ + // Get the current time. + uint64_t current = clock.now_ms (); + + // Execute the timers that are already due. + timers_t::iterator it = timers.begin (); + while (it != timers.end ()) { + + // If we have to wait to execute the item, same will be true about + // all the following items (multimap is sorted). Thus we can stop + // checking the subsequent timers and return the time to wait for + // the next timer (at least 1ms). + if (it->first > current) + return it->first - current; + + // Trigger the timer. + it->second.sink->timer_event (it->second.id); + + // Remove it from the list of active timers. + timers_t::iterator o = it; + ++it; + timers.erase (o); + } + + // There are no more timers. + return 0; +} diff --git a/src/poller_base.hpp b/src/poller_base.hpp index 0b0d53d..f607d94 100644 --- a/src/poller_base.hpp +++ b/src/poller_base.hpp @@ -20,6 +20,9 @@ #ifndef __ZMQ_POLLER_BASE_HPP_INCLUDED__ #define __ZMQ_POLLER_BASE_HPP_INCLUDED__ +#include + +#include "clock.hpp" #include "atomic_counter.hpp" namespace zmq @@ -36,13 +39,37 @@ namespace zmq // invoked from a different thread! int get_load (); + // Add a timeout to expire in timeout_ milliseconds. After the + // expiration timer_event on sink_ object will be called with + // argument set to id_. + void add_timer (int timeout_, struct i_poll_events *sink_, int id_); + + // Cancel the timer created by sink_ object with ID equal to id_. + void cancel_timer (struct i_poll_events *sink_, int id_); + protected: // Called by individual poller implementations to manage the load. void adjust_load (int amount_); + // Executes any timers that are due. Returns number of milliseconds + // to wait to match the next timer or 0 meaning "no timers". + uint64_t execute_timers (); + private: + // Clock instance private to this I/O thread. + clock_t clock; + + // List of active timers. + struct timer_info_t + { + struct i_poll_events *sink; + int id; + }; + typedef std::multimap timers_t; + timers_t timers; + // Load of the poller. Currently the number of file descriptors // registered. atomic_counter_t load; diff --git a/src/select.cpp b/src/select.cpp index a04ee66..f6e5133 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -133,18 +133,6 @@ void zmq::select_t::reset_pollout (handle_t handle_) FD_CLR (handle_, &source_set_out); } -void zmq::select_t::add_timer (int timeout_, i_poll_events *events_, int id_) -{ - timers.push_back (events_); -} - -void zmq::select_t::cancel_timer (i_poll_events *events_, int id_) -{ - timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); - if (it != timers.end ()) - timers.erase (it); -} - void zmq::select_t::start () { worker.start (worker_routine, this); @@ -164,14 +152,13 @@ void zmq::select_t::loop () memcpy (&writefds, &source_set_out, sizeof source_set_out); memcpy (&exceptfds, &source_set_err, sizeof source_set_err); - // Compute the timout interval. Select is free to overwrite the - // value so we have to compute it each time anew. - timeval timeout = {max_timer_period / 1000, - (max_timer_period % 1000) * 1000}; + // Execute any due timers. + uint64_t timeout = execute_timers (); // Wait for events. + struct timeval tv = {timeout / 1000, timeout % 1000 * 1000}; int rc = select (maxfd + 1, &readfds, &writefds, &exceptfds, - timers.empty () ? NULL : &timeout); + timeout ? &tv : NULL); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); @@ -181,20 +168,10 @@ void zmq::select_t::loop () errno_assert (rc != -1); #endif - // Handle timer. - if (!rc) { - - // Use local list of timers as timer handlers may fill new timers - // into the original array. - timers_t t; - std::swap (timers, t); - - // Trigger all the timers. - for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (-1); - + // If there are no events (i.e. it's a timeout) there's no point + // in checking the pollset. + if (rc == 0) continue; - } for (fd_set_t::size_type i = 0; i < fds.size (); i ++) { if (fds [i].fd == retired_fd) diff --git a/src/select.hpp b/src/select.hpp index c6c7f51..121857e 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -60,8 +60,6 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (int timeout_, struct i_poll_events *events_, int id_); - void cancel_timer (struct i_poll_events *events_, int id_); void start (); void stop (); @@ -98,10 +96,6 @@ namespace zmq // If true, at least one file descriptor has retired. bool retired; - // List of all the engines waiting for the timer event. - typedef std::vector timers_t; - timers_t timers; - // If true, thread is shutting down. bool stopping; diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 7cb58d3..1e83529 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -92,8 +92,9 @@ void zmq::zmq_connecter_t::out_event () terminate (); } -void zmq::zmq_connecter_t::timer_event () +void zmq::zmq_connecter_t::timer_event (int id_) { + zmq_assert (id_ == reconnect_timer_id); wait = false; start_connecting (); } diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index 381e020..16bfc31 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -51,7 +51,7 @@ namespace zmq // Handlers for I/O events. void in_event (); void out_event (); - void timer_event (); + void timer_event (int id_); // Internal function to start the actual connection establishment. void start_connecting (); -- cgit v1.2.3