diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2012-02-16 10:05:01 +0900 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2012-02-16 10:05:01 +0900 |
commit | 2df873a435ff139cf9d1b10b666d75e6dc6da442 (patch) | |
tree | 042b0a349ca84919041fb37df7e5a3b7195d065d /src | |
parent | b67f88a7d6322a293ac3e3be9d6df9f358509221 (diff) |
Timers identified by dynamically generated handles
Timers are not longer identified by hard-wired IDs.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/io_object.cpp | 11 | ||||
-rw-r--r-- | src/io_object.hpp | 6 | ||||
-rw-r--r-- | src/io_thread.cpp | 2 | ||||
-rw-r--r-- | src/io_thread.hpp | 2 | ||||
-rw-r--r-- | src/ipc_connecter.cpp | 17 | ||||
-rw-r--r-- | src/ipc_connecter.hpp | 5 | ||||
-rw-r--r-- | src/monitor.cpp | 14 | ||||
-rw-r--r-- | src/monitor.hpp | 7 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 34 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 11 | ||||
-rw-r--r-- | src/pgm_sender.cpp | 48 | ||||
-rw-r--r-- | src/pgm_sender.hpp | 13 | ||||
-rw-r--r-- | src/poller_base.cpp | 24 | ||||
-rw-r--r-- | src/poller_base.hpp | 13 | ||||
-rw-r--r-- | src/reaper.cpp | 2 | ||||
-rw-r--r-- | src/reaper.hpp | 2 | ||||
-rw-r--r-- | src/session_base.cpp | 20 | ||||
-rw-r--r-- | src/session_base.hpp | 11 | ||||
-rw-r--r-- | src/socket_base.cpp | 2 | ||||
-rw-r--r-- | src/socket_base.hpp | 2 | ||||
-rw-r--r-- | src/tcp_connecter.cpp | 18 | ||||
-rw-r--r-- | src/tcp_connecter.hpp | 8 |
22 files changed, 136 insertions, 136 deletions
diff --git a/src/io_object.cpp b/src/io_object.cpp index 730123e..abc8204 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -82,14 +82,14 @@ void xs::io_object_t::reset_pollout (handle_t handle_) poller->reset_pollout (handle_); } -void xs::io_object_t::add_timer (int timeout_, int id_) +xs::handle_t xs::io_object_t::add_timer (int timeout_) { - poller->add_timer (timeout_, this, id_); + return poller->add_timer (timeout_, this); } -void xs::io_object_t::rm_timer (int id_) +void xs::io_object_t::rm_timer (handle_t handle_) { - poller->rm_timer (this, id_); + poller->rm_timer (handle_); } void xs::io_object_t::in_event (fd_t fd_) @@ -98,11 +98,12 @@ void xs::io_object_t::in_event (fd_t fd_) } void xs::io_object_t::out_event (fd_t fd_) + { xs_assert (false); } -void xs::io_object_t::timer_event (int id_) +void xs::io_object_t::timer_event (handle_t handle_) { xs_assert (false); } diff --git a/src/io_object.hpp b/src/io_object.hpp index 3236c22..0abd457 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -57,13 +57,13 @@ namespace xs void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (int timout_, int id_); - void rm_timer (int id_); + handle_t add_timer (int timout_); + void rm_timer (handle_t handle_); // i_poll_events interface implementation. void in_event (fd_t fd_); void out_event (fd_t fd_); - void timer_event (int id_); + void timer_event (handle_t handle_); private: diff --git a/src/io_thread.cpp b/src/io_thread.cpp index c1f5849..77cebfe 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -89,7 +89,7 @@ void xs::io_thread_t::out_event (fd_t fd_) xs_assert (false); } -void xs::io_thread_t::timer_event (int id_) +void xs::io_thread_t::timer_event (handle_t handle_) { // No timers here. This function is never called. xs_assert (false); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index e05ba05..379f420 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -59,7 +59,7 @@ namespace xs // i_poll_events implementation. void in_event (fd_t fd_); void out_event (fd_t fd_); - void timer_event (int id_); + void timer_event (handle_t handle_); // Used by io_objects to retrieve the assciated poller object. poller_base_t *get_poller (); diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 4f353bc..14d4355 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -46,7 +46,8 @@ xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, handle_valid (false), wait (wait_), session (session_), - current_reconnect_ivl(options.reconnect_ivl) + current_reconnect_ivl(options.reconnect_ivl), + reconnect_timer (NULL) { // TODO: set_addess should be called separately, so that the error // can be propagated. @@ -56,8 +57,10 @@ xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, xs::ipc_connecter_t::~ipc_connecter_t () { - if (wait) - rm_timer (reconnect_timer_id); + if (wait) { + xs_assert (reconnect_timer); + rm_timer (reconnect_timer); + } if (handle_valid) rm_fd (handle); @@ -106,9 +109,10 @@ void xs::ipc_connecter_t::out_event (fd_t fd_) terminate (); } -void xs::ipc_connecter_t::timer_event (int id_) +void xs::ipc_connecter_t::timer_event (handle_t handle_) { - xs_assert (id_ == reconnect_timer_id); + xs_assert (handle_ == reconnect_timer); + reconnect_timer = NULL; wait = false; start_connecting (); } @@ -142,7 +146,8 @@ void xs::ipc_connecter_t::start_connecting () void xs::ipc_connecter_t::add_reconnect_timer() { - add_timer (get_new_reconnect_ivl(), reconnect_timer_id); + xs_assert (reconnect_timer == NULL); + reconnect_timer = add_timer (get_new_reconnect_ivl()); } int xs::ipc_connecter_t::get_new_reconnect_ivl () diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 81edda2..02b7382 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -59,7 +59,7 @@ namespace xs // Handlers for I/O events. void in_event (fd_t fd_); void out_event (fd_t fd_); - void timer_event (int id_); + void timer_event (handle_t handle_); // Internal function to start the actual connection establishment. void start_connecting (); @@ -109,6 +109,9 @@ namespace xs // Current reconnect ivl, updated for backoff strategy int current_reconnect_ivl; + // Handle of the reconnect timer, if active. NULL otherwise. + handle_t reconnect_timer; + ipc_connecter_t (const ipc_connecter_t&); const ipc_connecter_t &operator = (const ipc_connecter_t&); }; diff --git a/src/monitor.cpp b/src/monitor.cpp index 7665005..b29c874 100644 --- a/src/monitor.cpp +++ b/src/monitor.cpp @@ -26,7 +26,8 @@ xs::monitor_t::monitor_t (xs::io_thread_t *io_thread_) : own_t (io_thread_, options_t ()), - io_object_t (io_thread_) + io_object_t (io_thread_), + timer (NULL) { } @@ -54,19 +55,20 @@ void xs::monitor_t::log (int sid_, const char *text_) void xs::monitor_t::process_plug () { // Schedule sending of the first snapshot. - add_timer (500 + (generate_random () % 1000), timer_id); + timer = add_timer (500 + (generate_random () % 1000)); } void xs::monitor_t::process_stop () { - rm_timer (timer_id); + rm_timer (timer); + timer = NULL; send_done (); delete this; } -void xs::monitor_t::timer_event (int id_) +void xs::monitor_t::timer_event (handle_t handle_) { - xs_assert (id_ == timer_id); + xs_assert (handle_ == timer); // Send the snapshot here! sync.lock (); @@ -74,5 +76,5 @@ void xs::monitor_t::timer_event (int id_) sync.unlock (); // Wait before sending next snapshot. - add_timer (500 + (generate_random () % 1000), timer_id); + timer = add_timer (500 + (generate_random () % 1000)); } diff --git a/src/monitor.hpp b/src/monitor.hpp index 8aa58ba..73bc31c 100644 --- a/src/monitor.hpp +++ b/src/monitor.hpp @@ -47,19 +47,20 @@ namespace xs private: - enum {timer_id = 0x44}; - // Handlers for incoming commands. void process_plug (); void process_stop (); // Events from the poller. - void timer_event (int id_); + void timer_event (handle_t handle_); // Actual monitoring data to send and the related critical section. std::string text; mutex_t sync; + // Handle of the timer. + handle_t timer; + monitor_t (const monitor_t&); const monitor_t &operator = (const monitor_t&); }; diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index c8aec2d..b2cc513 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -40,12 +40,12 @@ xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, const options_t &options_) : io_object_t (parent_), - has_rx_timer (false), pgm_socket (true, options_), options (options_), session (NULL), mru_decoder (NULL), - pending_bytes (0) + pending_bytes (0), + rx_timer (NULL) { } @@ -90,9 +90,9 @@ void xs::pgm_receiver_t::unplug () mru_decoder = NULL; pending_bytes = 0; - if (has_rx_timer) { - rm_timer (rx_timer_id); - has_rx_timer = false; + if (rx_timer) { + rm_timer (rx_timer); + rx_timer = NULL; } rm_fd (socket_handle); @@ -157,9 +157,9 @@ void xs::pgm_receiver_t::in_event (fd_t fd_) if (pending_bytes > 0) return; - if (has_rx_timer) { - rm_timer (rx_timer_id); - has_rx_timer = false; + if (rx_timer) { + rm_timer (rx_timer); + rx_timer = NULL; } // TODO: This loop can effectively block other engines in the same I/O @@ -177,8 +177,8 @@ void xs::pgm_receiver_t::in_event (fd_t fd_) if (received == 0) { if (errno == ENOMEM || errno == EBUSY) { const long timeout = pgm_socket.get_rx_timeout (); - add_timer (timeout, rx_timer_id); - has_rx_timer = true; + xs_assert (!rx_timer); + rx_timer = add_timer (timeout); } break; } @@ -250,9 +250,9 @@ void xs::pgm_receiver_t::in_event (fd_t fd_) reset_pollin (socket_handle); // Reset outstanding timer. - if (has_rx_timer) { - rm_timer (rx_timer_id); - has_rx_timer = false; + if (rx_timer) { + rm_timer (rx_timer); + rx_timer = NULL; } break; @@ -263,12 +263,10 @@ void xs::pgm_receiver_t::in_event (fd_t fd_) session->flush (); } -void xs::pgm_receiver_t::timer_event (int token) +void xs::pgm_receiver_t::timer_event (handle_t handle_) { - xs_assert (token == rx_timer_id); - - // Timer cancels on return by poller_base. - has_rx_timer = false; + xs_assert (handle_ == rx_timer); + rx_timer = NULL; in_event (retired_fd); } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index cd767f8..b8390c1 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -66,7 +66,7 @@ namespace xs // i_poll_events interface implementation. void in_event (fd_t fd_); - void timer_event (int token); + void timer_event (handle_t handle_); private: @@ -74,12 +74,6 @@ namespace xs // the pending subscriptions. void drop_subscriptions (); - // RX timeout timer ID. - enum {rx_timer_id = 0xa1}; - - // RX timer is running. - bool has_rx_timer; - // If joined is true we are already getting messages from the peer. // It it's false, we are getting data but still we haven't seen // beginning of a message. @@ -128,6 +122,9 @@ namespace xs // Poll handle associated with engine PGM waiting pipe. handle_t pipe_handle; + // Receive timer, if active, otherwise NULL. + handle_t rx_timer; + pgm_receiver_t (const pgm_receiver_t&); const pgm_receiver_t &operator = (const pgm_receiver_t&); }; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 8637f5a..f68c044 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -41,14 +41,14 @@ xs::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, const options_t &options_) : io_object_t (parent_), - has_tx_timer (false), - has_rx_timer (false), encoder (0), pgm_socket (false, options_), options (options_), out_buffer (NULL), out_buffer_size (0), - write_size (0) + write_size (0), + rx_timer (NULL), + tx_timer (NULL) { } @@ -106,14 +106,14 @@ void xs::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) void xs::pgm_sender_t::unplug () { - if (has_rx_timer) { - rm_timer (rx_timer_id); - has_rx_timer = false; + if (rx_timer) { + rm_timer (rx_timer); + rx_timer = NULL; } - if (has_tx_timer) { - rm_timer (tx_timer_id); - has_tx_timer = false; + if (tx_timer) { + rm_timer (tx_timer); + tx_timer = NULL; } rm_fd (handle); @@ -150,17 +150,17 @@ xs::pgm_sender_t::~pgm_sender_t () void xs::pgm_sender_t::in_event (fd_t fd_) { - if (has_rx_timer) { - rm_timer (rx_timer_id); - has_rx_timer = false; + if (rx_timer) { + rm_timer (rx_timer); + rx_timer = NULL; } // In-event on sender side means NAK or SPMR receiving from some peer. pgm_socket.process_upstream (); if (errno == ENOMEM || errno == EBUSY) { const long timeout = pgm_socket.get_rx_timeout (); - add_timer (timeout, rx_timer_id); - has_rx_timer = true; + xs_assert (!rx_timer); + rx_timer = add_timer (timeout); } } @@ -189,9 +189,9 @@ void xs::pgm_sender_t::out_event (fd_t fd_) put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); } - if (has_tx_timer) { - rm_timer (tx_timer_id); - has_tx_timer = false; + if (tx_timer) { + rm_timer (tx_timer); + tx_timer = NULL; } // Send the data. @@ -205,21 +205,21 @@ void xs::pgm_sender_t::out_event (fd_t fd_) if (errno == ENOMEM) { const long timeout = pgm_socket.get_tx_timeout (); - add_timer (timeout, tx_timer_id); - has_tx_timer = true; + xs_assert (!tx_timer); + tx_timer = add_timer (timeout); } else xs_assert (errno == EBUSY); } } -void xs::pgm_sender_t::timer_event (int token) +void xs::pgm_sender_t::timer_event (handle_t handle_) { // Timer cancels on return by poller_base. - if (token == rx_timer_id) { - has_rx_timer = false; + if (handle_ == rx_timer) { + rx_timer = NULL; in_event (retired_fd); - } else if (token == tx_timer_id) { - has_tx_timer = false; + } else if (handle_ == tx_timer) { + tx_timer = NULL; out_event (retired_fd); } else xs_assert (false); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 23c241b..000f5ba 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -65,17 +65,10 @@ namespace xs // i_poll_events interface implementation. void in_event (fd_t fd_); void out_event (fd_t fd_); - void timer_event (int token); + void timer_event (handle_t handle_); private: - // TX and RX timeout timer ID's. - enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1}; - - // Timers are running. - bool has_tx_timer; - bool has_rx_timer; - // Message encoder. encoder_t encoder; @@ -101,6 +94,10 @@ namespace xs // If zero, there are no data to be sent. size_t write_size; + // Receive and send timers or NULL if not active. + handle_t rx_timer; + handle_t tx_timer; + pgm_sender_t (const pgm_sender_t&); const pgm_sender_t &operator = (const pgm_sender_t&); }; diff --git a/src/poller_base.cpp b/src/poller_base.cpp index bc03444..8bf091e 100644 --- a/src/poller_base.cpp +++ b/src/poller_base.cpp @@ -68,24 +68,20 @@ void xs::poller_base_t::adjust_load (int amount_) load.sub (-amount_); } -void xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_, int id_) +xs::handle_t xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_) { uint64_t expiration = clock.now_ms () + timeout_; - timer_info_t info = {sink_, id_}; - timers.insert (timers_t::value_type (expiration, info)); + timer_info_t info = {sink_, timers_t::iterator ()}; + timers_t::iterator it = timers.insert ( + timers_t::value_type (expiration, info)); + it->second.self = it; + return (handle_t) &(it->second); } -void xs::poller_base_t::rm_timer (i_poll_events *sink_, int id_) +void xs::poller_base_t::rm_timer (handle_t handle_) { - // Complexity of this operation is O(n). We assume it is rarely used. - for (timers_t::iterator it = timers.begin (); it != timers.end (); ++it) - if (it->second.sink == sink_ && it->second.id == id_) { - timers.erase (it); - return; - } - - // Timer not found. - xs_assert (false); + timer_info_t *info = (timer_info_t*) handle_; + timers.erase (info->self); } uint64_t xs::poller_base_t::execute_timers () @@ -109,7 +105,7 @@ uint64_t xs::poller_base_t::execute_timers () return it->first - current; // Trigger the timer. - it->second.sink->timer_event (it->second.id); + it->second.sink->timer_event ((handle_t) &it->second); // Remove it from the list of active timers. timers_t::iterator o = it; diff --git a/src/poller_base.hpp b/src/poller_base.hpp index 15e03a6..5edb5d4 100644 --- a/src/poller_base.hpp +++ b/src/poller_base.hpp @@ -47,7 +47,7 @@ namespace xs virtual void out_event (fd_t fd_) = 0; // Called when timer expires. - virtual void timer_event (int id_) = 0; + virtual void timer_event (handle_t handle_) = 0; }; class poller_base_t @@ -73,12 +73,11 @@ namespace xs virtual void stop () = 0; // Add a timeout to expire in timeout_ milliseconds. After the - // expiration timer_event on sink_ object will be called with - // argument set to id_. - void add_timer (int timeout_, xs::i_poll_events *sink_, int id_); + // expiration timer_event on sink_ object will be called. + handle_t add_timer (int timeout_, xs::i_poll_events *sink_); - // Cancel the timer created by sink_ object with ID equal to id_. - void rm_timer (xs::i_poll_events *sink_, int id_); + // Cancel the timer identified by the handle. + void rm_timer (handle_t handle_); protected: @@ -100,7 +99,7 @@ namespace xs struct timer_info_t { xs::i_poll_events *sink; - int id; + std::multimap <uint64_t, timer_info_t>::iterator self; }; typedef std::multimap <uint64_t, timer_info_t> timers_t; timers_t timers; diff --git a/src/reaper.cpp b/src/reaper.cpp index c9a5961..8dc2da6 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -78,7 +78,7 @@ void xs::reaper_t::out_event (fd_t fd_) xs_assert (false); } -void xs::reaper_t::timer_event (int id_) +void xs::reaper_t::timer_event (handle_t handle_) { xs_assert (false); } diff --git a/src/reaper.hpp b/src/reaper.hpp index c76970e..bc40fac 100644 --- a/src/reaper.hpp +++ b/src/reaper.hpp @@ -46,7 +46,7 @@ namespace xs // i_poll_events implementation. void in_event (fd_t fd_); void out_event (fd_t fd_); - void timer_event (int id_); + void timer_event (handle_t handle_); private: diff --git a/src/session_base.cpp b/src/session_base.cpp index 2bd5918..acb13b3 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -112,9 +112,9 @@ xs::session_base_t::session_base_t (class io_thread_t *io_thread_, engine (NULL), socket (socket_), io_thread (io_thread_), - has_linger_timer (false), send_identity (options_.send_identity), - recv_identity (options_.recv_identity) + recv_identity (options_.recv_identity), + linger_timer (NULL) { if (protocol_) protocol = protocol_; @@ -127,9 +127,9 @@ xs::session_base_t::~session_base_t () xs_assert (!pipe); // If there's still a pending linger timer, remove it. - if (has_linger_timer) { - rm_timer (linger_timer_id); - has_linger_timer = false; + if (linger_timer) { + rm_timer (linger_timer); + linger_timer = NULL; } // Close the engine. @@ -329,9 +329,8 @@ void xs::session_base_t::process_term (int linger_) // If linger is infinite (negative) we don't even have to set // the timer. if (linger_ > 0) { - xs_assert (!has_linger_timer); - add_timer (linger_, linger_timer_id); - has_linger_timer = true; + xs_assert (!linger_timer); + linger_timer = add_timer (linger_); } // Start pipe termination process. Delay the termination till all messages @@ -353,12 +352,11 @@ void xs::session_base_t::proceed_with_term () own_t::process_term (0); } -void xs::session_base_t::timer_event (int id_) +void xs::session_base_t::timer_event (handle_t handle_) { // Linger period expired. We can proceed with termination even though // there are still pending messages to be sent. - xs_assert (id_ == linger_timer_id); - has_linger_timer = false; + xs_assert (handle_ == linger_timer); // Ask pipe to terminate even though there may be pending messages in it. xs_assert (pipe); diff --git a/src/session_base.hpp b/src/session_base.hpp index 2360b6b..45ad8f8 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -84,7 +84,7 @@ namespace xs void process_term (int linger_); // i_poll_events handlers. - void timer_event (int id_); + void timer_event (handle_t handle_); // Remove any half processed messages. Flush unflushed messages. // Call this function when engine disconnect to get rid of leftovers. @@ -118,12 +118,6 @@ namespace xs // the engines into the same thread. xs::io_thread_t *io_thread; - // ID of the linger timer - enum {linger_timer_id = 0x20}; - - // True is linger timer is running. - bool has_linger_timer; - // If true, identity is to be sent/recvd from the network. bool send_identity; bool recv_identity; @@ -132,6 +126,9 @@ namespace xs std::string protocol; std::string address; + // Handle of the linger timer, if active, NULL otherwise. + handle_t linger_timer; + session_base_t (const session_base_t&); const session_base_t &operator = (const session_base_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 94b555b..a321a71 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -807,7 +807,7 @@ void xs::socket_base_t::out_event (fd_t fd_) xs_assert (false); } -void xs::socket_base_t::timer_event (int id_) +void xs::socket_base_t::timer_event (handle_t handle_) { xs_assert (false); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 6aafeb2..01a1564 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -87,7 +87,7 @@ namespace xs // is handled by the poller in the reaper thread. void in_event (fd_t fd_); void out_event (fd_t fd_); - void timer_event (int id_); + void timer_event (handle_t handle_); // i_pipe_events interface implementation. void read_activated (pipe_t *pipe_); diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index fbaaf8e..2b4f657 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -55,7 +55,8 @@ xs::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, handle_valid (false), wait (wait_), session (session_), - current_reconnect_ivl(options.reconnect_ivl) + current_reconnect_ivl(options.reconnect_ivl), + reconnect_timer (NULL) { // TODO: set_addess should be called separately, so that the error // can be propagated. @@ -65,8 +66,11 @@ xs::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, xs::tcp_connecter_t::~tcp_connecter_t () { - if (wait) - rm_timer (reconnect_timer_id); + if (wait) { + xs_assert (reconnect_timer); + rm_timer (reconnect_timer); + reconnect_timer = NULL; + } if (handle_valid) rm_fd (handle); @@ -117,9 +121,10 @@ void xs::tcp_connecter_t::out_event (fd_t fd_) terminate (); } -void xs::tcp_connecter_t::timer_event (int id_) +void xs::tcp_connecter_t::timer_event (handle_t handle_) { - xs_assert (id_ == reconnect_timer_id); + xs_assert (handle_ == reconnect_timer); + reconnect_timer = NULL; wait = false; start_connecting (); } @@ -153,7 +158,8 @@ void xs::tcp_connecter_t::start_connecting () void xs::tcp_connecter_t::add_reconnect_timer() { - add_timer (get_new_reconnect_ivl(), reconnect_timer_id); + xs_assert (!reconnect_timer); + reconnect_timer = add_timer (get_new_reconnect_ivl()); } int xs::tcp_connecter_t::get_new_reconnect_ivl () diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 5791422..b86c2d5 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -47,16 +47,13 @@ namespace xs private: - // ID of the timer used to delay the reconnection. - enum {reconnect_timer_id = 1}; - // Handlers for incoming commands. void process_plug (); // Handlers for I/O events. void in_event (fd_t fd_); void out_event (fd_t fd_); - void timer_event (int id_); + void timer_event (handle_t handle_); // Internal function to start the actual connection establishment. void start_connecting (); @@ -109,6 +106,9 @@ namespace xs // Current reconnect ivl, updated for backoff strategy int current_reconnect_ivl; + // Handle to reconnect timer, if active, NULL otherwise. + handle_t reconnect_timer; + tcp_connecter_t (const tcp_connecter_t&); const tcp_connecter_t &operator = (const tcp_connecter_t&); }; |