From cf815e8c785254d97190f223765fbbd19a1e6d52 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 26 Sep 2010 18:30:03 +0200 Subject: new interfaces for timers; the implementation is not changed yet --- src/config.hpp | 4 ++++ src/devpoll.cpp | 6 +++--- src/devpoll.hpp | 7 +++---- src/epoll.cpp | 6 +++--- src/epoll.hpp | 4 ++-- src/i_poll_events.hpp | 2 +- src/io_object.cpp | 10 +++++----- src/io_object.hpp | 6 +++--- src/io_thread.cpp | 2 +- src/io_thread.hpp | 2 +- src/kqueue.cpp | 6 +++--- src/kqueue.hpp | 4 ++-- src/poll.cpp | 6 +++--- src/poll.hpp | 4 ++-- src/select.cpp | 6 +++--- src/select.hpp | 4 ++-- src/zmq_connecter.cpp | 8 ++++---- src/zmq_connecter.hpp | 3 +++ 18 files changed, 48 insertions(+), 42 deletions(-) (limited to 'src') diff --git a/src/config.hpp b/src/config.hpp index 81b5a2a..bceb41c 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -69,9 +69,13 @@ namespace zmq // Maximum number of events the I/O thread can process in one go. max_io_events = 256, +// TODO: To be removed // Maximal wait time for a timer (milliseconds). max_timer_period = 100, + // How long to wait (milliseconds) till reattempting to connect. + reconnect_period = 100, + // Maximal delay to process command in API thread (in CPU ticks). // 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs. // Note that delay is only applied when there is continuous stream of diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 003f465..7054c2b 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -128,12 +128,12 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_) devpoll_ctl (handle_, fd_table [handle_].events); } -void zmq::devpoll_t::add_timer (i_poll_events *events_) +void zmq::devpoll_t::add_timer (int timeout_, i_poll_events *events_, int id_) { timers.push_back (events_); } -void zmq::devpoll_t::cancel_timer (i_poll_events *events_) +void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it != timers.end ()) @@ -190,7 +190,7 @@ void zmq::devpoll_t::loop () // Trigger all the timers. for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (); + (*it)->timer_event (-1); continue; } diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 019d268..00be385 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -33,8 +33,7 @@ namespace zmq { - // Implements socket polling mechanism using the Solaris-specific - // "/dev/poll" interface. + // Implements socket polling mechanism using the "/dev/poll" interface. class devpoll_t { @@ -52,8 +51,8 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (struct i_poll_events *events_); - void cancel_timer (struct i_poll_events *events_); + void add_timer (int timeout_, struct i_poll_events *events_, int id_); + void cancel_timer (struct i_poll_events *events_, int id_); int get_load (); void start (); void stop (); diff --git a/src/epoll.cpp b/src/epoll.cpp index e22eb8c..bbad639 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -120,12 +120,12 @@ void zmq::epoll_t::reset_pollout (handle_t handle_) errno_assert (rc != -1); } -void zmq::epoll_t::add_timer (i_poll_events *events_) +void zmq::epoll_t::add_timer (int timeout_, i_poll_events *events_, int id_) { timers.push_back (events_); } -void zmq::epoll_t::cancel_timer (i_poll_events *events_) +void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it == timers.end ()) @@ -175,7 +175,7 @@ void zmq::epoll_t::loop () // Trigger all the timers. for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (); + (*it)->timer_event (-1); continue; } diff --git a/src/epoll.hpp b/src/epoll.hpp index 38175cb..a68f055 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -53,8 +53,8 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (struct i_poll_events *events_); - void cancel_timer (struct i_poll_events *events_); + void add_timer (int timeout_, struct i_poll_events *events_, int id_); + void cancel_timer (struct i_poll_events *events_, int id_); int get_load (); void start (); void stop (); diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp index 6d474b2..8e70921 100644 --- a/src/i_poll_events.hpp +++ b/src/i_poll_events.hpp @@ -37,7 +37,7 @@ namespace zmq virtual void out_event () = 0; // Called when timer expires. - virtual void timer_event () = 0; + virtual void timer_event (int id_) = 0; }; } diff --git a/src/io_object.cpp b/src/io_object.cpp index b3b45ee..d2620a6 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -80,14 +80,14 @@ void zmq::io_object_t::reset_pollout (handle_t handle_) poller->reset_pollout (handle_); } -void zmq::io_object_t::add_timer () +void zmq::io_object_t::add_timer (int timeout_, int id_) { - poller->add_timer (this); + poller->add_timer (timeout_, this, id_); } -void zmq::io_object_t::cancel_timer () +void zmq::io_object_t::cancel_timer (int id_) { - poller->cancel_timer (this); + poller->cancel_timer (this, id_); } void zmq::io_object_t::in_event () @@ -100,7 +100,7 @@ void zmq::io_object_t::out_event () zmq_assert (false); } -void zmq::io_object_t::timer_event () +void zmq::io_object_t::timer_event (int id_) { zmq_assert (false); } diff --git a/src/io_object.hpp b/src/io_object.hpp index 284e6d1..ba69acc 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -56,13 +56,13 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (); - void cancel_timer (); + void add_timer (int timout_, int id_); + void cancel_timer (int id_); // i_poll_events interface implementation. void in_event (); void out_event (); - void timer_event (); + void timer_event (int id_); private: diff --git a/src/io_thread.cpp b/src/io_thread.cpp index d1d95f3..05a5eb2 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -89,7 +89,7 @@ void zmq::io_thread_t::out_event () zmq_assert (false); } -void zmq::io_thread_t::timer_event () +void zmq::io_thread_t::timer_event (int id_) { // No timers here. This function is never called. zmq_assert (false); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 9e7c2ea..20d4ae3 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -56,7 +56,7 @@ namespace zmq // i_poll_events implementation. void in_event (); void out_event (); - void timer_event (); + void timer_event (int id_); // Used by io_objects to retrieve the assciated poller object. poller_t *get_poller (); diff --git a/src/kqueue.cpp b/src/kqueue.cpp index e1fe2fa..f76a08f 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -132,12 +132,12 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_) kevent_delete (pe->fd, EVFILT_WRITE); } -void zmq::kqueue_t::add_timer (i_poll_events *events_) +void zmq::kqueue_t::add_timer (int timeout_, i_poll_events *events_, int id_) { timers.push_back (events_); } -void zmq::kqueue_t::cancel_timer (i_poll_events *events_) +void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it != timers.end ()) @@ -186,7 +186,7 @@ void zmq::kqueue_t::loop () // Trigger all the timers. for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (); + (*it)->timer_event (-1); continue; } diff --git a/src/kqueue.hpp b/src/kqueue.hpp index ac28a7d..43c2a39 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -53,8 +53,8 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (struct i_poll_events *events_); - void cancel_timer (struct i_poll_events *events_); + void add_timer (int timeout_, struct i_poll_events *events_, int id_); + void cancel_timer (struct i_poll_events *events_, int id_); int get_load (); void start (); void stop (); diff --git a/src/poll.cpp b/src/poll.cpp index 1b203db..513c405 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -112,12 +112,12 @@ void zmq::poll_t::reset_pollout (handle_t handle_) pollset [index].events &= ~((short) POLLOUT); } -void zmq::poll_t::add_timer (i_poll_events *events_) +void zmq::poll_t::add_timer (int timeout_, i_poll_events *events_, int id_) { timers.push_back (events_); } -void zmq::poll_t::cancel_timer (i_poll_events *events_) +void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it != timers.end ()) @@ -160,7 +160,7 @@ void zmq::poll_t::loop () // Trigger all the timers. for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (); + (*it)->timer_event (-1); continue; } diff --git a/src/poll.hpp b/src/poll.hpp index f4ae35a..96d18dd 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -58,8 +58,8 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (struct i_poll_events *events_); - void cancel_timer (struct i_poll_events *events_); + void add_timer (int timeout_, struct i_poll_events *events_, int id_); + void cancel_timer (struct i_poll_events *events_, int id_); int get_load (); void start (); void stop (); diff --git a/src/select.cpp b/src/select.cpp index 59eb83e..f169239 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -136,12 +136,12 @@ void zmq::select_t::reset_pollout (handle_t handle_) FD_CLR (handle_, &source_set_out); } -void zmq::select_t::add_timer (i_poll_events *events_) +void zmq::select_t::add_timer (int timeout_, i_poll_events *events_, int id_) { timers.push_back (events_); } -void zmq::select_t::cancel_timer (i_poll_events *events_) +void zmq::select_t::cancel_timer (i_poll_events *events_, int id_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it != timers.end ()) @@ -199,7 +199,7 @@ void zmq::select_t::loop () // Trigger all the timers. for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (); + (*it)->timer_event (-1); continue; } diff --git a/src/select.hpp b/src/select.hpp index 01e9fa8..ae19fe1 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -60,8 +60,8 @@ namespace zmq void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (struct i_poll_events *events_); - void cancel_timer (struct i_poll_events *events_); + void add_timer (int timeout_, struct i_poll_events *events_, int id_); + void cancel_timer (struct i_poll_events *events_, int id_); int get_load (); void start (); void stop (); diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index cfca875..7cb58d3 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -42,7 +42,7 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_, zmq::zmq_connecter_t::~zmq_connecter_t () { if (wait) - cancel_timer (); + cancel_timer (reconnect_timer_id); if (handle_valid) rm_fd (handle); } @@ -50,7 +50,7 @@ zmq::zmq_connecter_t::~zmq_connecter_t () void zmq::zmq_connecter_t::process_plug () { if (wait) - add_timer (); + add_timer (reconnect_period, reconnect_timer_id); else start_connecting (); } @@ -73,7 +73,7 @@ void zmq::zmq_connecter_t::out_event () if (fd == retired_fd) { tcp_connecter.close (); wait = true; - add_timer (); + add_timer (reconnect_period, reconnect_timer_id); return; } @@ -121,5 +121,5 @@ void zmq::zmq_connecter_t::start_connecting () // Handle any other error condition by eventual reconnect. wait = true; - add_timer (); + add_timer (reconnect_period, reconnect_timer_id); } diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index f1a4c30..381e020 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -42,6 +42,9 @@ namespace zmq private: + // ID of the timer used to delay the reconnection. + enum {reconnect_timer_id = 1}; + // Handlers for incoming commands. void process_plug (); -- cgit v1.2.3