summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:05:01 +0900
committerMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:05:01 +0900
commit2df873a435ff139cf9d1b10b666d75e6dc6da442 (patch)
tree042b0a349ca84919041fb37df7e5a3b7195d065d
parentb67f88a7d6322a293ac3e3be9d6df9f358509221 (diff)
Timers identified by dynamically generated handles
Timers are not longer identified by hard-wired IDs. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/io_object.cpp11
-rw-r--r--src/io_object.hpp6
-rw-r--r--src/io_thread.cpp2
-rw-r--r--src/io_thread.hpp2
-rw-r--r--src/ipc_connecter.cpp17
-rw-r--r--src/ipc_connecter.hpp5
-rw-r--r--src/monitor.cpp14
-rw-r--r--src/monitor.hpp7
-rw-r--r--src/pgm_receiver.cpp34
-rw-r--r--src/pgm_receiver.hpp11
-rw-r--r--src/pgm_sender.cpp48
-rw-r--r--src/pgm_sender.hpp13
-rw-r--r--src/poller_base.cpp24
-rw-r--r--src/poller_base.hpp13
-rw-r--r--src/reaper.cpp2
-rw-r--r--src/reaper.hpp2
-rw-r--r--src/session_base.cpp20
-rw-r--r--src/session_base.hpp11
-rw-r--r--src/socket_base.cpp2
-rw-r--r--src/socket_base.hpp2
-rw-r--r--src/tcp_connecter.cpp18
-rw-r--r--src/tcp_connecter.hpp8
22 files changed, 136 insertions, 136 deletions
diff --git a/src/io_object.cpp b/src/io_object.cpp
index 730123e..abc8204 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -82,14 +82,14 @@ void xs::io_object_t::reset_pollout (handle_t handle_)
poller->reset_pollout (handle_);
}
-void xs::io_object_t::add_timer (int timeout_, int id_)
+xs::handle_t xs::io_object_t::add_timer (int timeout_)
{
- poller->add_timer (timeout_, this, id_);
+ return poller->add_timer (timeout_, this);
}
-void xs::io_object_t::rm_timer (int id_)
+void xs::io_object_t::rm_timer (handle_t handle_)
{
- poller->rm_timer (this, id_);
+ poller->rm_timer (handle_);
}
void xs::io_object_t::in_event (fd_t fd_)
@@ -98,11 +98,12 @@ void xs::io_object_t::in_event (fd_t fd_)
}
void xs::io_object_t::out_event (fd_t fd_)
+
{
xs_assert (false);
}
-void xs::io_object_t::timer_event (int id_)
+void xs::io_object_t::timer_event (handle_t handle_)
{
xs_assert (false);
}
diff --git a/src/io_object.hpp b/src/io_object.hpp
index 3236c22..0abd457 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -57,13 +57,13 @@ namespace xs
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (int timout_, int id_);
- void rm_timer (int id_);
+ handle_t add_timer (int timout_);
+ void rm_timer (handle_t handle_);
// i_poll_events interface implementation.
void in_event (fd_t fd_);
void out_event (fd_t fd_);
- void timer_event (int id_);
+ void timer_event (handle_t handle_);
private:
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index c1f5849..77cebfe 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -89,7 +89,7 @@ void xs::io_thread_t::out_event (fd_t fd_)
xs_assert (false);
}
-void xs::io_thread_t::timer_event (int id_)
+void xs::io_thread_t::timer_event (handle_t handle_)
{
// No timers here. This function is never called.
xs_assert (false);
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index e05ba05..379f420 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -59,7 +59,7 @@ namespace xs
// i_poll_events implementation.
void in_event (fd_t fd_);
void out_event (fd_t fd_);
- void timer_event (int id_);
+ void timer_event (handle_t handle_);
// Used by io_objects to retrieve the assciated poller object.
poller_base_t *get_poller ();
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index 4f353bc..14d4355 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -46,7 +46,8 @@ xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
handle_valid (false),
wait (wait_),
session (session_),
- current_reconnect_ivl(options.reconnect_ivl)
+ current_reconnect_ivl(options.reconnect_ivl),
+ reconnect_timer (NULL)
{
// TODO: set_addess should be called separately, so that the error
// can be propagated.
@@ -56,8 +57,10 @@ xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
xs::ipc_connecter_t::~ipc_connecter_t ()
{
- if (wait)
- rm_timer (reconnect_timer_id);
+ if (wait) {
+ xs_assert (reconnect_timer);
+ rm_timer (reconnect_timer);
+ }
if (handle_valid)
rm_fd (handle);
@@ -106,9 +109,10 @@ void xs::ipc_connecter_t::out_event (fd_t fd_)
terminate ();
}
-void xs::ipc_connecter_t::timer_event (int id_)
+void xs::ipc_connecter_t::timer_event (handle_t handle_)
{
- xs_assert (id_ == reconnect_timer_id);
+ xs_assert (handle_ == reconnect_timer);
+ reconnect_timer = NULL;
wait = false;
start_connecting ();
}
@@ -142,7 +146,8 @@ void xs::ipc_connecter_t::start_connecting ()
void xs::ipc_connecter_t::add_reconnect_timer()
{
- add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
+ xs_assert (reconnect_timer == NULL);
+ reconnect_timer = add_timer (get_new_reconnect_ivl());
}
int xs::ipc_connecter_t::get_new_reconnect_ivl ()
diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp
index 81edda2..02b7382 100644
--- a/src/ipc_connecter.hpp
+++ b/src/ipc_connecter.hpp
@@ -59,7 +59,7 @@ namespace xs
// Handlers for I/O events.
void in_event (fd_t fd_);
void out_event (fd_t fd_);
- void timer_event (int id_);
+ void timer_event (handle_t handle_);
// Internal function to start the actual connection establishment.
void start_connecting ();
@@ -109,6 +109,9 @@ namespace xs
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
+ // Handle of the reconnect timer, if active. NULL otherwise.
+ handle_t reconnect_timer;
+
ipc_connecter_t (const ipc_connecter_t&);
const ipc_connecter_t &operator = (const ipc_connecter_t&);
};
diff --git a/src/monitor.cpp b/src/monitor.cpp
index 7665005..b29c874 100644
--- a/src/monitor.cpp
+++ b/src/monitor.cpp
@@ -26,7 +26,8 @@
xs::monitor_t::monitor_t (xs::io_thread_t *io_thread_) :
own_t (io_thread_, options_t ()),
- io_object_t (io_thread_)
+ io_object_t (io_thread_),
+ timer (NULL)
{
}
@@ -54,19 +55,20 @@ void xs::monitor_t::log (int sid_, const char *text_)
void xs::monitor_t::process_plug ()
{
// Schedule sending of the first snapshot.
- add_timer (500 + (generate_random () % 1000), timer_id);
+ timer = add_timer (500 + (generate_random () % 1000));
}
void xs::monitor_t::process_stop ()
{
- rm_timer (timer_id);
+ rm_timer (timer);
+ timer = NULL;
send_done ();
delete this;
}
-void xs::monitor_t::timer_event (int id_)
+void xs::monitor_t::timer_event (handle_t handle_)
{
- xs_assert (id_ == timer_id);
+ xs_assert (handle_ == timer);
// Send the snapshot here!
sync.lock ();
@@ -74,5 +76,5 @@ void xs::monitor_t::timer_event (int id_)
sync.unlock ();
// Wait before sending next snapshot.
- add_timer (500 + (generate_random () % 1000), timer_id);
+ timer = add_timer (500 + (generate_random () % 1000));
}
diff --git a/src/monitor.hpp b/src/monitor.hpp
index 8aa58ba..73bc31c 100644
--- a/src/monitor.hpp
+++ b/src/monitor.hpp
@@ -47,19 +47,20 @@ namespace xs
private:
- enum {timer_id = 0x44};
-
// Handlers for incoming commands.
void process_plug ();
void process_stop ();
// Events from the poller.
- void timer_event (int id_);
+ void timer_event (handle_t handle_);
// Actual monitoring data to send and the related critical section.
std::string text;
mutex_t sync;
+ // Handle of the timer.
+ handle_t timer;
+
monitor_t (const monitor_t&);
const monitor_t &operator = (const monitor_t&);
};
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index c8aec2d..b2cc513 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -40,12 +40,12 @@
xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
- has_rx_timer (false),
pgm_socket (true, options_),
options (options_),
session (NULL),
mru_decoder (NULL),
- pending_bytes (0)
+ pending_bytes (0),
+ rx_timer (NULL)
{
}
@@ -90,9 +90,9 @@ void xs::pgm_receiver_t::unplug ()
mru_decoder = NULL;
pending_bytes = 0;
- if (has_rx_timer) {
- rm_timer (rx_timer_id);
- has_rx_timer = false;
+ if (rx_timer) {
+ rm_timer (rx_timer);
+ rx_timer = NULL;
}
rm_fd (socket_handle);
@@ -157,9 +157,9 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)
if (pending_bytes > 0)
return;
- if (has_rx_timer) {
- rm_timer (rx_timer_id);
- has_rx_timer = false;
+ if (rx_timer) {
+ rm_timer (rx_timer);
+ rx_timer = NULL;
}
// TODO: This loop can effectively block other engines in the same I/O
@@ -177,8 +177,8 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)
if (received == 0) {
if (errno == ENOMEM || errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout ();
- add_timer (timeout, rx_timer_id);
- has_rx_timer = true;
+ xs_assert (!rx_timer);
+ rx_timer = add_timer (timeout);
}
break;
}
@@ -250,9 +250,9 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)
reset_pollin (socket_handle);
// Reset outstanding timer.
- if (has_rx_timer) {
- rm_timer (rx_timer_id);
- has_rx_timer = false;
+ if (rx_timer) {
+ rm_timer (rx_timer);
+ rx_timer = NULL;
}
break;
@@ -263,12 +263,10 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)
session->flush ();
}
-void xs::pgm_receiver_t::timer_event (int token)
+void xs::pgm_receiver_t::timer_event (handle_t handle_)
{
- xs_assert (token == rx_timer_id);
-
- // Timer cancels on return by poller_base.
- has_rx_timer = false;
+ xs_assert (handle_ == rx_timer);
+ rx_timer = NULL;
in_event (retired_fd);
}
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index cd767f8..b8390c1 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -66,7 +66,7 @@ namespace xs
// i_poll_events interface implementation.
void in_event (fd_t fd_);
- void timer_event (int token);
+ void timer_event (handle_t handle_);
private:
@@ -74,12 +74,6 @@ namespace xs
// the pending subscriptions.
void drop_subscriptions ();
- // RX timeout timer ID.
- enum {rx_timer_id = 0xa1};
-
- // RX timer is running.
- bool has_rx_timer;
-
// If joined is true we are already getting messages from the peer.
// It it's false, we are getting data but still we haven't seen
// beginning of a message.
@@ -128,6 +122,9 @@ namespace xs
// Poll handle associated with engine PGM waiting pipe.
handle_t pipe_handle;
+ // Receive timer, if active, otherwise NULL.
+ handle_t rx_timer;
+
pgm_receiver_t (const pgm_receiver_t&);
const pgm_receiver_t &operator = (const pgm_receiver_t&);
};
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 8637f5a..f68c044 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -41,14 +41,14 @@
xs::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
- has_tx_timer (false),
- has_rx_timer (false),
encoder (0),
pgm_socket (false, options_),
options (options_),
out_buffer (NULL),
out_buffer_size (0),
- write_size (0)
+ write_size (0),
+ rx_timer (NULL),
+ tx_timer (NULL)
{
}
@@ -106,14 +106,14 @@ void xs::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
void xs::pgm_sender_t::unplug ()
{
- if (has_rx_timer) {
- rm_timer (rx_timer_id);
- has_rx_timer = false;
+ if (rx_timer) {
+ rm_timer (rx_timer);
+ rx_timer = NULL;
}
- if (has_tx_timer) {
- rm_timer (tx_timer_id);
- has_tx_timer = false;
+ if (tx_timer) {
+ rm_timer (tx_timer);
+ tx_timer = NULL;
}
rm_fd (handle);
@@ -150,17 +150,17 @@ xs::pgm_sender_t::~pgm_sender_t ()
void xs::pgm_sender_t::in_event (fd_t fd_)
{
- if (has_rx_timer) {
- rm_timer (rx_timer_id);
- has_rx_timer = false;
+ if (rx_timer) {
+ rm_timer (rx_timer);
+ rx_timer = NULL;
}
// In-event on sender side means NAK or SPMR receiving from some peer.
pgm_socket.process_upstream ();
if (errno == ENOMEM || errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout ();
- add_timer (timeout, rx_timer_id);
- has_rx_timer = true;
+ xs_assert (!rx_timer);
+ rx_timer = add_timer (timeout);
}
}
@@ -189,9 +189,9 @@ void xs::pgm_sender_t::out_event (fd_t fd_)
put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
}
- if (has_tx_timer) {
- rm_timer (tx_timer_id);
- has_tx_timer = false;
+ if (tx_timer) {
+ rm_timer (tx_timer);
+ tx_timer = NULL;
}
// Send the data.
@@ -205,21 +205,21 @@ void xs::pgm_sender_t::out_event (fd_t fd_)
if (errno == ENOMEM) {
const long timeout = pgm_socket.get_tx_timeout ();
- add_timer (timeout, tx_timer_id);
- has_tx_timer = true;
+ xs_assert (!tx_timer);
+ tx_timer = add_timer (timeout);
} else
xs_assert (errno == EBUSY);
}
}
-void xs::pgm_sender_t::timer_event (int token)
+void xs::pgm_sender_t::timer_event (handle_t handle_)
{
// Timer cancels on return by poller_base.
- if (token == rx_timer_id) {
- has_rx_timer = false;
+ if (handle_ == rx_timer) {
+ rx_timer = NULL;
in_event (retired_fd);
- } else if (token == tx_timer_id) {
- has_tx_timer = false;
+ } else if (handle_ == tx_timer) {
+ tx_timer = NULL;
out_event (retired_fd);
} else
xs_assert (false);
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 23c241b..000f5ba 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -65,17 +65,10 @@ namespace xs
// i_poll_events interface implementation.
void in_event (fd_t fd_);
void out_event (fd_t fd_);
- void timer_event (int token);
+ void timer_event (handle_t handle_);
private:
- // TX and RX timeout timer ID's.
- enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1};
-
- // Timers are running.
- bool has_tx_timer;
- bool has_rx_timer;
-
// Message encoder.
encoder_t encoder;
@@ -101,6 +94,10 @@ namespace xs
// If zero, there are no data to be sent.
size_t write_size;
+ // Receive and send timers or NULL if not active.
+ handle_t rx_timer;
+ handle_t tx_timer;
+
pgm_sender_t (const pgm_sender_t&);
const pgm_sender_t &operator = (const pgm_sender_t&);
};
diff --git a/src/poller_base.cpp b/src/poller_base.cpp
index bc03444..8bf091e 100644
--- a/src/poller_base.cpp
+++ b/src/poller_base.cpp
@@ -68,24 +68,20 @@ void xs::poller_base_t::adjust_load (int amount_)
load.sub (-amount_);
}
-void xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_, int id_)
+xs::handle_t xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_)
{
uint64_t expiration = clock.now_ms () + timeout_;
- timer_info_t info = {sink_, id_};
- timers.insert (timers_t::value_type (expiration, info));
+ timer_info_t info = {sink_, timers_t::iterator ()};
+ timers_t::iterator it = timers.insert (
+ timers_t::value_type (expiration, info));
+ it->second.self = it;
+ return (handle_t) &(it->second);
}
-void xs::poller_base_t::rm_timer (i_poll_events *sink_, int id_)
+void xs::poller_base_t::rm_timer (handle_t handle_)
{
- // Complexity of this operation is O(n). We assume it is rarely used.
- for (timers_t::iterator it = timers.begin (); it != timers.end (); ++it)
- if (it->second.sink == sink_ && it->second.id == id_) {
- timers.erase (it);
- return;
- }
-
- // Timer not found.
- xs_assert (false);
+ timer_info_t *info = (timer_info_t*) handle_;
+ timers.erase (info->self);
}
uint64_t xs::poller_base_t::execute_timers ()
@@ -109,7 +105,7 @@ uint64_t xs::poller_base_t::execute_timers ()
return it->first - current;
// Trigger the timer.
- it->second.sink->timer_event (it->second.id);
+ it->second.sink->timer_event ((handle_t) &it->second);
// Remove it from the list of active timers.
timers_t::iterator o = it;
diff --git a/src/poller_base.hpp b/src/poller_base.hpp
index 15e03a6..5edb5d4 100644
--- a/src/poller_base.hpp
+++ b/src/poller_base.hpp
@@ -47,7 +47,7 @@ namespace xs
virtual void out_event (fd_t fd_) = 0;
// Called when timer expires.
- virtual void timer_event (int id_) = 0;
+ virtual void timer_event (handle_t handle_) = 0;
};
class poller_base_t
@@ -73,12 +73,11 @@ namespace xs
virtual void stop () = 0;
// Add a timeout to expire in timeout_ milliseconds. After the
- // expiration timer_event on sink_ object will be called with
- // argument set to id_.
- void add_timer (int timeout_, xs::i_poll_events *sink_, int id_);
+ // expiration timer_event on sink_ object will be called.
+ handle_t add_timer (int timeout_, xs::i_poll_events *sink_);
- // Cancel the timer created by sink_ object with ID equal to id_.
- void rm_timer (xs::i_poll_events *sink_, int id_);
+ // Cancel the timer identified by the handle.
+ void rm_timer (handle_t handle_);
protected:
@@ -100,7 +99,7 @@ namespace xs
struct timer_info_t
{
xs::i_poll_events *sink;
- int id;
+ std::multimap <uint64_t, timer_info_t>::iterator self;
};
typedef std::multimap <uint64_t, timer_info_t> timers_t;
timers_t timers;
diff --git a/src/reaper.cpp b/src/reaper.cpp
index c9a5961..8dc2da6 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -78,7 +78,7 @@ void xs::reaper_t::out_event (fd_t fd_)
xs_assert (false);
}
-void xs::reaper_t::timer_event (int id_)
+void xs::reaper_t::timer_event (handle_t handle_)
{
xs_assert (false);
}
diff --git a/src/reaper.hpp b/src/reaper.hpp
index c76970e..bc40fac 100644
--- a/src/reaper.hpp
+++ b/src/reaper.hpp
@@ -46,7 +46,7 @@ namespace xs
// i_poll_events implementation.
void in_event (fd_t fd_);
void out_event (fd_t fd_);
- void timer_event (int id_);
+ void timer_event (handle_t handle_);
private:
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 2bd5918..acb13b3 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -112,9 +112,9 @@ xs::session_base_t::session_base_t (class io_thread_t *io_thread_,
engine (NULL),
socket (socket_),
io_thread (io_thread_),
- has_linger_timer (false),
send_identity (options_.send_identity),
- recv_identity (options_.recv_identity)
+ recv_identity (options_.recv_identity),
+ linger_timer (NULL)
{
if (protocol_)
protocol = protocol_;
@@ -127,9 +127,9 @@ xs::session_base_t::~session_base_t ()
xs_assert (!pipe);
// If there's still a pending linger timer, remove it.
- if (has_linger_timer) {
- rm_timer (linger_timer_id);
- has_linger_timer = false;
+ if (linger_timer) {
+ rm_timer (linger_timer);
+ linger_timer = NULL;
}
// Close the engine.
@@ -329,9 +329,8 @@ void xs::session_base_t::process_term (int linger_)
// If linger is infinite (negative) we don't even have to set
// the timer.
if (linger_ > 0) {
- xs_assert (!has_linger_timer);
- add_timer (linger_, linger_timer_id);
- has_linger_timer = true;
+ xs_assert (!linger_timer);
+ linger_timer = add_timer (linger_);
}
// Start pipe termination process. Delay the termination till all messages
@@ -353,12 +352,11 @@ void xs::session_base_t::proceed_with_term ()
own_t::process_term (0);
}
-void xs::session_base_t::timer_event (int id_)
+void xs::session_base_t::timer_event (handle_t handle_)
{
// Linger period expired. We can proceed with termination even though
// there are still pending messages to be sent.
- xs_assert (id_ == linger_timer_id);
- has_linger_timer = false;
+ xs_assert (handle_ == linger_timer);
// Ask pipe to terminate even though there may be pending messages in it.
xs_assert (pipe);
diff --git a/src/session_base.hpp b/src/session_base.hpp
index 2360b6b..45ad8f8 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -84,7 +84,7 @@ namespace xs
void process_term (int linger_);
// i_poll_events handlers.
- void timer_event (int id_);
+ void timer_event (handle_t handle_);
// Remove any half processed messages. Flush unflushed messages.
// Call this function when engine disconnect to get rid of leftovers.
@@ -118,12 +118,6 @@ namespace xs
// the engines into the same thread.
xs::io_thread_t *io_thread;
- // ID of the linger timer
- enum {linger_timer_id = 0x20};
-
- // True is linger timer is running.
- bool has_linger_timer;
-
// If true, identity is to be sent/recvd from the network.
bool send_identity;
bool recv_identity;
@@ -132,6 +126,9 @@ namespace xs
std::string protocol;
std::string address;
+ // Handle of the linger timer, if active, NULL otherwise.
+ handle_t linger_timer;
+
session_base_t (const session_base_t&);
const session_base_t &operator = (const session_base_t&);
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 94b555b..a321a71 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -807,7 +807,7 @@ void xs::socket_base_t::out_event (fd_t fd_)
xs_assert (false);
}
-void xs::socket_base_t::timer_event (int id_)
+void xs::socket_base_t::timer_event (handle_t handle_)
{
xs_assert (false);
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 6aafeb2..01a1564 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -87,7 +87,7 @@ namespace xs
// is handled by the poller in the reaper thread.
void in_event (fd_t fd_);
void out_event (fd_t fd_);
- void timer_event (int id_);
+ void timer_event (handle_t handle_);
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index fbaaf8e..2b4f657 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -55,7 +55,8 @@ xs::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
handle_valid (false),
wait (wait_),
session (session_),
- current_reconnect_ivl(options.reconnect_ivl)
+ current_reconnect_ivl(options.reconnect_ivl),
+ reconnect_timer (NULL)
{
// TODO: set_addess should be called separately, so that the error
// can be propagated.
@@ -65,8 +66,11 @@ xs::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
xs::tcp_connecter_t::~tcp_connecter_t ()
{
- if (wait)
- rm_timer (reconnect_timer_id);
+ if (wait) {
+ xs_assert (reconnect_timer);
+ rm_timer (reconnect_timer);
+ reconnect_timer = NULL;
+ }
if (handle_valid)
rm_fd (handle);
@@ -117,9 +121,10 @@ void xs::tcp_connecter_t::out_event (fd_t fd_)
terminate ();
}
-void xs::tcp_connecter_t::timer_event (int id_)
+void xs::tcp_connecter_t::timer_event (handle_t handle_)
{
- xs_assert (id_ == reconnect_timer_id);
+ xs_assert (handle_ == reconnect_timer);
+ reconnect_timer = NULL;
wait = false;
start_connecting ();
}
@@ -153,7 +158,8 @@ void xs::tcp_connecter_t::start_connecting ()
void xs::tcp_connecter_t::add_reconnect_timer()
{
- add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
+ xs_assert (!reconnect_timer);
+ reconnect_timer = add_timer (get_new_reconnect_ivl());
}
int xs::tcp_connecter_t::get_new_reconnect_ivl ()
diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp
index 5791422..b86c2d5 100644
--- a/src/tcp_connecter.hpp
+++ b/src/tcp_connecter.hpp
@@ -47,16 +47,13 @@ namespace xs
private:
- // ID of the timer used to delay the reconnection.
- enum {reconnect_timer_id = 1};
-
// Handlers for incoming commands.
void process_plug ();
// Handlers for I/O events.
void in_event (fd_t fd_);
void out_event (fd_t fd_);
- void timer_event (int id_);
+ void timer_event (handle_t handle_);
// Internal function to start the actual connection establishment.
void start_connecting ();
@@ -109,6 +106,9 @@ namespace xs
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
+ // Handle to reconnect timer, if active, NULL otherwise.
+ handle_t reconnect_timer;
+
tcp_connecter_t (const tcp_connecter_t&);
const tcp_connecter_t &operator = (const tcp_connecter_t&);
};