summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/config.hpp4
-rw-r--r--src/devpoll.cpp35
-rw-r--r--src/devpoll.hpp6
-rw-r--r--src/epoll.cpp43
-rw-r--r--src/epoll.hpp6
-rw-r--r--src/kqueue.cpp40
-rw-r--r--src/kqueue.hpp6
-rw-r--r--src/poll.cpp33
-rw-r--r--src/poll.hpp6
-rw-r--r--src/poller_base.cpp52
-rw-r--r--src/poller_base.hpp27
-rw-r--r--src/select.cpp37
-rw-r--r--src/select.hpp6
-rw-r--r--src/zmq_connecter.cpp3
-rw-r--r--src/zmq_connecter.hpp2
15 files changed, 113 insertions, 193 deletions
diff --git a/src/config.hpp b/src/config.hpp
index bceb41c..57c2373 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -69,10 +69,6 @@ namespace zmq
// Maximum number of events the I/O thread can process in one go.
max_io_events = 256,
-// TODO: To be removed
- // Maximal wait time for a timer (milliseconds).
- max_timer_period = 100,
-
// How long to wait (milliseconds) till reattempting to connect.
reconnect_period = 100,
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index 32aca50..75df105 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -124,18 +124,6 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_)
devpoll_ctl (handle_, fd_table [handle_].events);
}
-void zmq::devpoll_t::add_timer (int timeout_, i_poll_events *events_, int id_)
-{
- timers.push_back (events_);
-}
-
-void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_)
-{
- timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
- if (it != timers.end ())
- timers.erase (it);
-}
-
void zmq::devpoll_t::start ()
{
worker.start (worker_routine, this);
@@ -161,31 +149,18 @@ void zmq::devpoll_t::loop ()
fd_table [pending_list [i]].accepted = true;
pending_list.clear ();
- poll_req.dp_fds = &ev_buf [0];
- poll_req.dp_nfds = nfds;
- poll_req.dp_timeout = timers.empty () ? -1 : max_timer_period;
+ // Execute any due timers.
+ uint64_t timeout = execute_timers ();
// Wait for events.
+ poll_req.dp_fds = &ev_buf [0];
+ poll_req.dp_nfds = nfds;
+ poll_req.dp_timeout = timout ? timeout : -1;
int n = ioctl (devpoll_fd, DP_POLL, &poll_req);
if (n == -1 && errno == EINTR)
continue;
errno_assert (n != -1);
- // Handle timer.
- if (!n) {
-
- // Use local list of timers as timer handlers may fill new timers
- // into the original array.
- timers_t t;
- std::swap (timers, t);
-
- // Trigger all the timers.
- for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
- (*it)->timer_event (-1);
-
- continue;
- }
-
for (int i = 0; i < n; i ++) {
fd_entry_t *fd_ptr = &fd_table [ev_buf [i].fd];
diff --git a/src/devpoll.hpp b/src/devpoll.hpp
index ae16583..31a8068 100644
--- a/src/devpoll.hpp
+++ b/src/devpoll.hpp
@@ -51,8 +51,6 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (int timeout_, struct i_poll_events *events_, int id_);
- void cancel_timer (struct i_poll_events *events_, int id_);
void start ();
void stop ();
@@ -83,10 +81,6 @@ namespace zmq
// Pollset manipulation function.
void devpoll_ctl (fd_t fd_, short events_);
- // List of all the engines waiting for the timer event.
- typedef std::vector <struct i_poll_events*> timers_t;
- timers_t timers;
-
// If true, thread is in the process of shutting down.
bool stopping;
diff --git a/src/epoll.cpp b/src/epoll.cpp
index 584a13f..ac0ffb1 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -117,19 +117,6 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)
errno_assert (rc != -1);
}
-void zmq::epoll_t::add_timer (int timeout_, i_poll_events *events_, int id_)
-{
- timers.push_back (events_);
-}
-
-void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_)
-{
- timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
- if (it == timers.end ())
- return;
- timers.erase (it);
-}
-
void zmq::epoll_t::start ()
{
worker.start (worker_routine, this);
@@ -146,31 +133,15 @@ void zmq::epoll_t::loop ()
while (!stopping) {
- // Wait for events.
- int n;
- while (true) {
- n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events,
- timers.empty () ? -1 : max_timer_period);
- if (!(n == -1 && errno == EINTR)) {
- errno_assert (n != -1);
- break;
- }
- }
-
- // Handle timer.
- if (!n) {
-
- // Use local list of timers as timer handlers may fill new timers
- // into the original array.
- timers_t t;
- std::swap (timers, t);
-
- // Trigger all the timers.
- for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
- (*it)->timer_event (-1);
+ // Execute any due timers.
+ uint64_t timeout = execute_timers ();
+ // Wait for events.
+ int n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events,
+ timeout ? timeout : -1);
+ if (n == -1 && errno == EINTR)
continue;
- }
+ errno_assert (n != -1);
for (int i = 0; i < n; i ++) {
poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);
diff --git a/src/epoll.hpp b/src/epoll.hpp
index a19fc0d..015e3d8 100644
--- a/src/epoll.hpp
+++ b/src/epoll.hpp
@@ -53,8 +53,6 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (int timeout_, struct i_poll_events *events_, int id_);
- void cancel_timer (struct i_poll_events *events_, int id_);
void start ();
void stop ();
@@ -80,10 +78,6 @@ namespace zmq
typedef std::vector <poll_entry_t*> retired_t;
retired_t retired;
- // List of all the engines waiting for the timer event.
- typedef std::vector <struct i_poll_events*> timers_t;
- timers_t timers;
-
// If true, thread is in the process of shutting down.
bool stopping;
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index 47178d3..965c5e9 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -132,18 +132,6 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)
kevent_delete (pe->fd, EVFILT_WRITE);
}
-void zmq::kqueue_t::add_timer (int timeout_, i_poll_events *events_, int id_)
-{
- timers.push_back (events_);
-}
-
-void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_)
-{
- timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
- if (it != timers.end ())
- timers.erase (it);
-}
-
void zmq::kqueue_t::start ()
{
worker.start (worker_routine, this);
@@ -158,34 +146,18 @@ void zmq::kqueue_t::loop ()
{
while (!stopping) {
- struct kevent ev_buf [max_io_events];
-
- // Compute time interval to wait.
- timespec timeout = {max_timer_period / 1000,
- (max_timer_period % 1000) * 1000000};
+ // Execute any due timers.
+ uint64_t timeout = execute_timers ();
// Wait for events.
- int n = kevent (kqueue_fd, NULL, 0,
- &ev_buf [0], max_io_events, timers.empty () ? NULL : &timeout);
+ struct kevent ev_buf [max_io_events];
+ timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
+ int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
+ timeout ? &ts: NULL);
if (n == -1 && errno == EINTR)
continue;
errno_assert (n != -1);
- // Handle timer.
- if (!n) {
-
- // Use local list of timers as timer handlers may fill new timers
- // into the original array.
- timers_t t;
- std::swap (timers, t);
-
- // Trigger all the timers.
- for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
- (*it)->timer_event (-1);
-
- continue;
- }
-
for (int i = 0; i < n; i ++) {
poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;
diff --git a/src/kqueue.hpp b/src/kqueue.hpp
index 6a27260..47d6b74 100644
--- a/src/kqueue.hpp
+++ b/src/kqueue.hpp
@@ -53,8 +53,6 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (int timeout_, struct i_poll_events *events_, int id_);
- void cancel_timer (struct i_poll_events *events_, int id_);
void start ();
void stop ();
@@ -87,10 +85,6 @@ namespace zmq
typedef std::vector <poll_entry_t*> retired_t;
retired_t retired;
- // List of all the engines waiting for the timer event.
- typedef std::vector <struct i_poll_events*> timers_t;
- timers_t timers;
-
// If true, thread is in the process of shutting down.
bool stopping;
diff --git a/src/poll.cpp b/src/poll.cpp
index 9afa6b5..1cab5bb 100644
--- a/src/poll.cpp
+++ b/src/poll.cpp
@@ -109,18 +109,6 @@ void zmq::poll_t::reset_pollout (handle_t handle_)
pollset [index].events &= ~((short) POLLOUT);
}
-void zmq::poll_t::add_timer (int timeout_, i_poll_events *events_, int id_)
-{
- timers.push_back (events_);
-}
-
-void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_)
-{
- timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
- if (it != timers.end ())
- timers.erase (it);
-}
-
void zmq::poll_t::start ()
{
worker.start (worker_routine, this);
@@ -135,27 +123,20 @@ void zmq::poll_t::loop ()
{
while (!stopping) {
+ // Execute any due timers.
+ uint64_t timeout = execute_timers ();
+
// Wait for events.
- int rc = poll (&pollset [0], pollset.size (),
- timers.empty () ? -1 : max_timer_period);
+ int rc = poll (&pollset [0], pollset.size (), timeout ? timeout : -1);
if (rc == -1 && errno == EINTR)
continue;
errno_assert (rc != -1);
- // Handle timer.
- if (!rc) {
-
- // Use local list of timers as timer handlers may fill new timers
- // into the original array.
- timers_t t;
- std::swap (timers, t);
-
- // Trigger all the timers.
- for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
- (*it)->timer_event (-1);
+ // If there are no events (i.e. it's a timeout) there's no point
+ // in checking the pollset.
+ if (rc == 0)
continue;
- }
for (pollset_t::size_type i = 0; i != pollset.size (); i++) {
diff --git a/src/poll.hpp b/src/poll.hpp
index e88c39d..07555b0 100644
--- a/src/poll.hpp
+++ b/src/poll.hpp
@@ -58,8 +58,6 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (int timeout_, struct i_poll_events *events_, int id_);
- void cancel_timer (struct i_poll_events *events_, int id_);
void start ();
void stop ();
@@ -87,10 +85,6 @@ namespace zmq
// If true, there's at least one retired event source.
bool retired;
- // List of all the engines waiting for the timer event.
- typedef std::vector <struct i_poll_events*> timers_t;
- timers_t timers;
-
// If true, thread is in the process of shutting down.
bool stopping;
diff --git a/src/poller_base.cpp b/src/poller_base.cpp
index f1de7e9..d55692a 100644
--- a/src/poller_base.cpp
+++ b/src/poller_base.cpp
@@ -18,6 +18,7 @@
*/
#include "poller_base.hpp"
+#include "i_poll_events.hpp"
#include "err.hpp"
zmq::poller_base_t::poller_base_t ()
@@ -26,7 +27,7 @@ zmq::poller_base_t::poller_base_t ()
zmq::poller_base_t::~poller_base_t ()
{
- // Make sure there are no fds registered on shutdown.
+ // Make sure there is no more load on the shutdown.
zmq_assert (get_load () == 0);
}
@@ -42,3 +43,52 @@ void zmq::poller_base_t::adjust_load (int amount_)
else if (amount_ < 0)
load.sub (-amount_);
}
+
+void zmq::poller_base_t::add_timer (int timeout_, i_poll_events *sink_, int id_)
+{
+ uint64_t expiration = clock.now_ms () + timeout_;
+ timer_info_t info = {sink_, id_};
+ timers.insert (std::make_pair (expiration, info));
+}
+
+void zmq::poller_base_t::cancel_timer (i_poll_events *sink_, int id_)
+{
+ // Complexity of this operation is O(n). We assume it is rarely used.
+ for (timers_t::iterator it = timers.begin (); it != timers.end (); it++)
+ if (it->second.sink == sink_ && it->second.id == id_) {
+ timers.erase (it);
+ return;
+ }
+
+ // Timer not found.
+ zmq_assert (false);
+}
+
+uint64_t zmq::poller_base_t::execute_timers ()
+{
+ // Get the current time.
+ uint64_t current = clock.now_ms ();
+
+ // Execute the timers that are already due.
+ timers_t::iterator it = timers.begin ();
+ while (it != timers.end ()) {
+
+ // If we have to wait to execute the item, same will be true about
+ // all the following items (multimap is sorted). Thus we can stop
+ // checking the subsequent timers and return the time to wait for
+ // the next timer (at least 1ms).
+ if (it->first > current)
+ return it->first - current;
+
+ // Trigger the timer.
+ it->second.sink->timer_event (it->second.id);
+
+ // Remove it from the list of active timers.
+ timers_t::iterator o = it;
+ ++it;
+ timers.erase (o);
+ }
+
+ // There are no more timers.
+ return 0;
+}
diff --git a/src/poller_base.hpp b/src/poller_base.hpp
index 0b0d53d..f607d94 100644
--- a/src/poller_base.hpp
+++ b/src/poller_base.hpp
@@ -20,6 +20,9 @@
#ifndef __ZMQ_POLLER_BASE_HPP_INCLUDED__
#define __ZMQ_POLLER_BASE_HPP_INCLUDED__
+#include <map>
+
+#include "clock.hpp"
#include "atomic_counter.hpp"
namespace zmq
@@ -36,13 +39,37 @@ namespace zmq
// invoked from a different thread!
int get_load ();
+ // Add a timeout to expire in timeout_ milliseconds. After the
+ // expiration timer_event on sink_ object will be called with
+ // argument set to id_.
+ void add_timer (int timeout_, struct i_poll_events *sink_, int id_);
+
+ // Cancel the timer created by sink_ object with ID equal to id_.
+ void cancel_timer (struct i_poll_events *sink_, int id_);
+
protected:
// Called by individual poller implementations to manage the load.
void adjust_load (int amount_);
+ // Executes any timers that are due. Returns number of milliseconds
+ // to wait to match the next timer or 0 meaning "no timers".
+ uint64_t execute_timers ();
+
private:
+ // Clock instance private to this I/O thread.
+ clock_t clock;
+
+ // List of active timers.
+ struct timer_info_t
+ {
+ struct i_poll_events *sink;
+ int id;
+ };
+ typedef std::multimap <uint64_t, timer_info_t> timers_t;
+ timers_t timers;
+
// Load of the poller. Currently the number of file descriptors
// registered.
atomic_counter_t load;
diff --git a/src/select.cpp b/src/select.cpp
index a04ee66..f6e5133 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -133,18 +133,6 @@ void zmq::select_t::reset_pollout (handle_t handle_)
FD_CLR (handle_, &source_set_out);
}
-void zmq::select_t::add_timer (int timeout_, i_poll_events *events_, int id_)
-{
- timers.push_back (events_);
-}
-
-void zmq::select_t::cancel_timer (i_poll_events *events_, int id_)
-{
- timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
- if (it != timers.end ())
- timers.erase (it);
-}
-
void zmq::select_t::start ()
{
worker.start (worker_routine, this);
@@ -164,14 +152,13 @@ void zmq::select_t::loop ()
memcpy (&writefds, &source_set_out, sizeof source_set_out);
memcpy (&exceptfds, &source_set_err, sizeof source_set_err);
- // Compute the timout interval. Select is free to overwrite the
- // value so we have to compute it each time anew.
- timeval timeout = {max_timer_period / 1000,
- (max_timer_period % 1000) * 1000};
+ // Execute any due timers.
+ uint64_t timeout = execute_timers ();
// Wait for events.
+ struct timeval tv = {timeout / 1000, timeout % 1000 * 1000};
int rc = select (maxfd + 1, &readfds, &writefds, &exceptfds,
- timers.empty () ? NULL : &timeout);
+ timeout ? &tv : NULL);
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
@@ -181,20 +168,10 @@ void zmq::select_t::loop ()
errno_assert (rc != -1);
#endif
- // Handle timer.
- if (!rc) {
-
- // Use local list of timers as timer handlers may fill new timers
- // into the original array.
- timers_t t;
- std::swap (timers, t);
-
- // Trigger all the timers.
- for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
- (*it)->timer_event (-1);
-
+ // If there are no events (i.e. it's a timeout) there's no point
+ // in checking the pollset.
+ if (rc == 0)
continue;
- }
for (fd_set_t::size_type i = 0; i < fds.size (); i ++) {
if (fds [i].fd == retired_fd)
diff --git a/src/select.hpp b/src/select.hpp
index c6c7f51..121857e 100644
--- a/src/select.hpp
+++ b/src/select.hpp
@@ -60,8 +60,6 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (int timeout_, struct i_poll_events *events_, int id_);
- void cancel_timer (struct i_poll_events *events_, int id_);
void start ();
void stop ();
@@ -98,10 +96,6 @@ namespace zmq
// If true, at least one file descriptor has retired.
bool retired;
- // List of all the engines waiting for the timer event.
- typedef std::vector <struct i_poll_events*> timers_t;
- timers_t timers;
-
// If true, thread is shutting down.
bool stopping;
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 7cb58d3..1e83529 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -92,8 +92,9 @@ void zmq::zmq_connecter_t::out_event ()
terminate ();
}
-void zmq::zmq_connecter_t::timer_event ()
+void zmq::zmq_connecter_t::timer_event (int id_)
{
+ zmq_assert (id_ == reconnect_timer_id);
wait = false;
start_connecting ();
}
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index 381e020..16bfc31 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -51,7 +51,7 @@ namespace zmq
// Handlers for I/O events.
void in_event ();
void out_event ();
- void timer_event ();
+ void timer_event (int id_);
// Internal function to start the actual connection establishment.
void start_connecting ();