diff options
| -rw-r--r-- | src/io_object.cpp | 11 | ||||
| -rw-r--r-- | src/io_object.hpp | 6 | ||||
| -rw-r--r-- | src/io_thread.cpp | 2 | ||||
| -rw-r--r-- | src/io_thread.hpp | 2 | ||||
| -rw-r--r-- | src/ipc_connecter.cpp | 17 | ||||
| -rw-r--r-- | src/ipc_connecter.hpp | 5 | ||||
| -rw-r--r-- | src/monitor.cpp | 14 | ||||
| -rw-r--r-- | src/monitor.hpp | 7 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 34 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 11 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 48 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 13 | ||||
| -rw-r--r-- | src/poller_base.cpp | 24 | ||||
| -rw-r--r-- | src/poller_base.hpp | 13 | ||||
| -rw-r--r-- | src/reaper.cpp | 2 | ||||
| -rw-r--r-- | src/reaper.hpp | 2 | ||||
| -rw-r--r-- | src/session_base.cpp | 20 | ||||
| -rw-r--r-- | src/session_base.hpp | 11 | ||||
| -rw-r--r-- | src/socket_base.cpp | 2 | ||||
| -rw-r--r-- | src/socket_base.hpp | 2 | ||||
| -rw-r--r-- | src/tcp_connecter.cpp | 18 | ||||
| -rw-r--r-- | src/tcp_connecter.hpp | 8 | 
22 files changed, 136 insertions, 136 deletions
diff --git a/src/io_object.cpp b/src/io_object.cpp index 730123e..abc8204 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -82,14 +82,14 @@ void xs::io_object_t::reset_pollout (handle_t handle_)      poller->reset_pollout (handle_);  } -void xs::io_object_t::add_timer (int timeout_, int id_) +xs::handle_t xs::io_object_t::add_timer (int timeout_)  { -    poller->add_timer (timeout_, this, id_); +    return poller->add_timer (timeout_, this);  } -void xs::io_object_t::rm_timer (int id_) +void xs::io_object_t::rm_timer (handle_t handle_)  { -    poller->rm_timer (this, id_); +    poller->rm_timer (handle_);  }  void xs::io_object_t::in_event (fd_t fd_) @@ -98,11 +98,12 @@ void xs::io_object_t::in_event (fd_t fd_)  }  void xs::io_object_t::out_event (fd_t fd_) +  {      xs_assert (false);  } -void xs::io_object_t::timer_event (int id_) +void xs::io_object_t::timer_event (handle_t handle_)  {      xs_assert (false);  } diff --git a/src/io_object.hpp b/src/io_object.hpp index 3236c22..0abd457 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -57,13 +57,13 @@ namespace xs          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (int timout_, int id_); -        void rm_timer (int id_); +        handle_t add_timer (int timout_); +        void rm_timer (handle_t handle_);          //  i_poll_events interface implementation.          void in_event (fd_t fd_);          void out_event (fd_t fd_); -        void timer_event (int id_); +        void timer_event (handle_t handle_);      private: diff --git a/src/io_thread.cpp b/src/io_thread.cpp index c1f5849..77cebfe 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -89,7 +89,7 @@ void xs::io_thread_t::out_event (fd_t fd_)      xs_assert (false);  } -void xs::io_thread_t::timer_event (int id_) +void xs::io_thread_t::timer_event (handle_t handle_)  {      //  No timers here. This function is never called.      xs_assert (false); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index e05ba05..379f420 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -59,7 +59,7 @@ namespace xs          //  i_poll_events implementation.          void in_event (fd_t fd_);          void out_event (fd_t fd_); -        void timer_event (int id_); +        void timer_event (handle_t handle_);          //  Used by io_objects to retrieve the assciated poller object.          poller_base_t *get_poller (); diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 4f353bc..14d4355 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -46,7 +46,8 @@ xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,      handle_valid (false),      wait (wait_),      session (session_), -    current_reconnect_ivl(options.reconnect_ivl) +    current_reconnect_ivl(options.reconnect_ivl), +    reconnect_timer (NULL)  {      //  TODO: set_addess should be called separately, so that the error      //  can be propagated. @@ -56,8 +57,10 @@ xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,  xs::ipc_connecter_t::~ipc_connecter_t ()  { -    if (wait) -        rm_timer (reconnect_timer_id); +    if (wait) { +        xs_assert (reconnect_timer); +        rm_timer (reconnect_timer); +    }      if (handle_valid)          rm_fd (handle); @@ -106,9 +109,10 @@ void xs::ipc_connecter_t::out_event (fd_t fd_)      terminate ();  } -void xs::ipc_connecter_t::timer_event (int id_) +void xs::ipc_connecter_t::timer_event (handle_t handle_)  { -    xs_assert (id_ == reconnect_timer_id); +    xs_assert (handle_ == reconnect_timer); +    reconnect_timer = NULL;      wait = false;      start_connecting ();  } @@ -142,7 +146,8 @@ void xs::ipc_connecter_t::start_connecting ()  void xs::ipc_connecter_t::add_reconnect_timer()  { -    add_timer (get_new_reconnect_ivl(), reconnect_timer_id); +    xs_assert (reconnect_timer == NULL); +    reconnect_timer = add_timer (get_new_reconnect_ivl());  }  int xs::ipc_connecter_t::get_new_reconnect_ivl () diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 81edda2..02b7382 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -59,7 +59,7 @@ namespace xs          //  Handlers for I/O events.          void in_event (fd_t fd_);          void out_event (fd_t fd_); -        void timer_event (int id_); +        void timer_event (handle_t handle_);          //  Internal function to start the actual connection establishment.          void start_connecting (); @@ -109,6 +109,9 @@ namespace xs          //  Current reconnect ivl, updated for backoff strategy          int current_reconnect_ivl; +        //  Handle of the reconnect timer, if active. NULL otherwise. +        handle_t reconnect_timer; +          ipc_connecter_t (const ipc_connecter_t&);          const ipc_connecter_t &operator = (const ipc_connecter_t&);      }; diff --git a/src/monitor.cpp b/src/monitor.cpp index 7665005..b29c874 100644 --- a/src/monitor.cpp +++ b/src/monitor.cpp @@ -26,7 +26,8 @@  xs::monitor_t::monitor_t (xs::io_thread_t *io_thread_) :      own_t (io_thread_, options_t ()), -    io_object_t (io_thread_) +    io_object_t (io_thread_), +    timer (NULL)  {  } @@ -54,19 +55,20 @@ void xs::monitor_t::log (int sid_, const char *text_)  void xs::monitor_t::process_plug ()  {      //  Schedule sending of the first snapshot. -    add_timer (500 + (generate_random () % 1000), timer_id); +    timer = add_timer (500 + (generate_random () % 1000));  }  void xs::monitor_t::process_stop ()  { -    rm_timer (timer_id); +    rm_timer (timer); +    timer = NULL;      send_done ();      delete this;  } -void xs::monitor_t::timer_event (int id_) +void xs::monitor_t::timer_event (handle_t handle_)  { -    xs_assert (id_ == timer_id); +    xs_assert (handle_ == timer);      //  Send the snapshot here!      sync.lock (); @@ -74,5 +76,5 @@ void xs::monitor_t::timer_event (int id_)      sync.unlock ();      //  Wait before sending next snapshot. -    add_timer (500 + (generate_random () % 1000), timer_id); +    timer = add_timer (500 + (generate_random () % 1000));  } diff --git a/src/monitor.hpp b/src/monitor.hpp index 8aa58ba..73bc31c 100644 --- a/src/monitor.hpp +++ b/src/monitor.hpp @@ -47,19 +47,20 @@ namespace xs      private: -        enum {timer_id = 0x44}; -          //  Handlers for incoming commands.          void process_plug ();          void process_stop ();          //  Events from the poller. -        void timer_event (int id_); +        void timer_event (handle_t handle_);          //  Actual monitoring data to send and the related critical section.          std::string text;          mutex_t sync; +        //  Handle of the timer. +        handle_t timer; +          monitor_t (const monitor_t&);          const monitor_t &operator = (const monitor_t&);      }; diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index c8aec2d..b2cc513 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -40,12 +40,12 @@  xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,         const options_t &options_) :      io_object_t (parent_), -    has_rx_timer (false),      pgm_socket (true, options_),      options (options_),      session (NULL),      mru_decoder (NULL), -    pending_bytes (0) +    pending_bytes (0), +    rx_timer (NULL)  {  } @@ -90,9 +90,9 @@ void xs::pgm_receiver_t::unplug ()      mru_decoder = NULL;      pending_bytes = 0; -    if (has_rx_timer) { -        rm_timer (rx_timer_id); -        has_rx_timer = false; +    if (rx_timer) { +        rm_timer (rx_timer); +        rx_timer = NULL;      }      rm_fd (socket_handle); @@ -157,9 +157,9 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)      if (pending_bytes > 0)          return; -    if (has_rx_timer) { -        rm_timer (rx_timer_id); -        has_rx_timer = false; +    if (rx_timer) { +        rm_timer (rx_timer); +        rx_timer = NULL;      }      //  TODO: This loop can effectively block other engines in the same I/O @@ -177,8 +177,8 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)          if (received == 0) {              if (errno == ENOMEM || errno == EBUSY) {                  const long timeout = pgm_socket.get_rx_timeout (); -                add_timer (timeout, rx_timer_id); -                has_rx_timer = true; +                xs_assert (!rx_timer); +                rx_timer = add_timer (timeout);              }              break;          } @@ -250,9 +250,9 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)              reset_pollin (socket_handle);              //  Reset outstanding timer. -            if (has_rx_timer) { -                rm_timer (rx_timer_id); -                has_rx_timer = false; +            if (rx_timer) { +                rm_timer (rx_timer); +                rx_timer = NULL;              }              break; @@ -263,12 +263,10 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)      session->flush ();  } -void xs::pgm_receiver_t::timer_event (int token) +void xs::pgm_receiver_t::timer_event (handle_t handle_)  { -    xs_assert (token == rx_timer_id); - -    //  Timer cancels on return by poller_base. -    has_rx_timer = false; +    xs_assert (handle_ == rx_timer); +    rx_timer = NULL;      in_event (retired_fd);  } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index cd767f8..b8390c1 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -66,7 +66,7 @@ namespace xs          //  i_poll_events interface implementation.          void in_event (fd_t fd_); -        void timer_event (int token); +        void timer_event (handle_t handle_);      private: @@ -74,12 +74,6 @@ namespace xs          //  the pending subscriptions.          void drop_subscriptions (); -        //  RX timeout timer ID. -        enum {rx_timer_id = 0xa1}; - -        //  RX timer is running. -        bool has_rx_timer; -          //  If joined is true we are already getting messages from the peer.          //  It it's false, we are getting data but still we haven't seen          //  beginning of a message. @@ -128,6 +122,9 @@ namespace xs          //  Poll handle associated with engine PGM waiting pipe.          handle_t pipe_handle; +        //  Receive timer, if active, otherwise NULL. +        handle_t rx_timer; +          pgm_receiver_t (const pgm_receiver_t&);          const pgm_receiver_t &operator = (const pgm_receiver_t&);      }; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 8637f5a..f68c044 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -41,14 +41,14 @@  xs::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,         const options_t &options_) :      io_object_t (parent_), -    has_tx_timer (false), -    has_rx_timer (false),      encoder (0),      pgm_socket (false, options_),      options (options_),      out_buffer (NULL),      out_buffer_size (0), -    write_size (0) +    write_size (0), +    rx_timer (NULL), +    tx_timer (NULL)  {  } @@ -106,14 +106,14 @@ void xs::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)  void xs::pgm_sender_t::unplug ()  { -    if (has_rx_timer) { -        rm_timer (rx_timer_id); -        has_rx_timer = false; +    if (rx_timer) { +        rm_timer (rx_timer); +        rx_timer = NULL;      } -    if (has_tx_timer) { -        rm_timer (tx_timer_id); -        has_tx_timer = false; +    if (tx_timer) { +        rm_timer (tx_timer); +        tx_timer = NULL;      }      rm_fd (handle); @@ -150,17 +150,17 @@ xs::pgm_sender_t::~pgm_sender_t ()  void xs::pgm_sender_t::in_event (fd_t fd_)  { -    if (has_rx_timer) { -        rm_timer (rx_timer_id); -        has_rx_timer = false; +    if (rx_timer) { +        rm_timer (rx_timer); +        rx_timer = NULL;      }      //  In-event on sender side means NAK or SPMR receiving from some peer.      pgm_socket.process_upstream ();      if (errno == ENOMEM || errno == EBUSY) {          const long timeout = pgm_socket.get_rx_timeout (); -        add_timer (timeout, rx_timer_id); -        has_rx_timer = true; +        xs_assert (!rx_timer); +        rx_timer = add_timer (timeout);      }  } @@ -189,9 +189,9 @@ void xs::pgm_sender_t::out_event (fd_t fd_)          put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);      } -    if (has_tx_timer) { -        rm_timer (tx_timer_id); -        has_tx_timer = false; +    if (tx_timer) { +        rm_timer (tx_timer); +        tx_timer = NULL;      }      //  Send the data. @@ -205,21 +205,21 @@ void xs::pgm_sender_t::out_event (fd_t fd_)          if (errno == ENOMEM) {              const long timeout = pgm_socket.get_tx_timeout (); -            add_timer (timeout, tx_timer_id); -            has_tx_timer = true; +            xs_assert (!tx_timer); +            tx_timer = add_timer (timeout);          } else              xs_assert (errno == EBUSY);      }  } -void xs::pgm_sender_t::timer_event (int token) +void xs::pgm_sender_t::timer_event (handle_t handle_)  {      //  Timer cancels on return by poller_base. -    if (token == rx_timer_id) { -        has_rx_timer = false; +    if (handle_ == rx_timer) { +        rx_timer = NULL;          in_event (retired_fd); -    } else if (token == tx_timer_id) { -        has_tx_timer = false; +    } else if (handle_ == tx_timer) { +        tx_timer = NULL;          out_event (retired_fd);      } else          xs_assert (false); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 23c241b..000f5ba 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -65,17 +65,10 @@ namespace xs          //  i_poll_events interface implementation.          void in_event (fd_t fd_);          void out_event (fd_t fd_); -        void timer_event (int token); +        void timer_event (handle_t handle_);      private: -        //  TX and RX timeout timer ID's. -        enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1}; - -        //  Timers are running. -        bool has_tx_timer; -        bool has_rx_timer; -          //  Message encoder.          encoder_t encoder; @@ -101,6 +94,10 @@ namespace xs          //  If zero, there are no data to be sent.          size_t write_size; +        //  Receive and send timers or NULL if not active. +        handle_t rx_timer; +        handle_t tx_timer; +          pgm_sender_t (const pgm_sender_t&);          const pgm_sender_t &operator = (const pgm_sender_t&);      }; diff --git a/src/poller_base.cpp b/src/poller_base.cpp index bc03444..8bf091e 100644 --- a/src/poller_base.cpp +++ b/src/poller_base.cpp @@ -68,24 +68,20 @@ void xs::poller_base_t::adjust_load (int amount_)          load.sub (-amount_);  } -void xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_, int id_) +xs::handle_t xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_)  {      uint64_t expiration = clock.now_ms () + timeout_; -    timer_info_t info = {sink_, id_}; -    timers.insert (timers_t::value_type (expiration, info)); +    timer_info_t info = {sink_, timers_t::iterator ()}; +    timers_t::iterator it = timers.insert ( +        timers_t::value_type (expiration, info)); +    it->second.self = it; +    return (handle_t) &(it->second);  } -void xs::poller_base_t::rm_timer (i_poll_events *sink_, int id_) +void xs::poller_base_t::rm_timer (handle_t handle_)  { -    //  Complexity of this operation is O(n). We assume it is rarely used. -    for (timers_t::iterator it = timers.begin (); it != timers.end (); ++it) -        if (it->second.sink == sink_ && it->second.id == id_) { -            timers.erase (it); -            return; -        } - -    //  Timer not found. -    xs_assert (false); +    timer_info_t *info = (timer_info_t*) handle_; +    timers.erase (info->self);  }  uint64_t xs::poller_base_t::execute_timers () @@ -109,7 +105,7 @@ uint64_t xs::poller_base_t::execute_timers ()              return it->first - current;          //  Trigger the timer. -        it->second.sink->timer_event (it->second.id); +        it->second.sink->timer_event ((handle_t) &it->second);          //  Remove it from the list of active timers.          timers_t::iterator o = it; diff --git a/src/poller_base.hpp b/src/poller_base.hpp index 15e03a6..5edb5d4 100644 --- a/src/poller_base.hpp +++ b/src/poller_base.hpp @@ -47,7 +47,7 @@ namespace xs          virtual void out_event (fd_t fd_) = 0;          // Called when timer expires. -        virtual void timer_event (int id_) = 0; +        virtual void timer_event (handle_t handle_) = 0;      };      class poller_base_t @@ -73,12 +73,11 @@ namespace xs          virtual void stop () = 0;          //  Add a timeout to expire in timeout_ milliseconds. After the -        //  expiration timer_event on sink_ object will be called with -        //  argument set to id_. -        void add_timer (int timeout_, xs::i_poll_events *sink_, int id_); +        //  expiration timer_event on sink_ object will be called. +        handle_t add_timer (int timeout_, xs::i_poll_events *sink_); -        //  Cancel the timer created by sink_ object with ID equal to id_. -        void rm_timer (xs::i_poll_events *sink_, int id_); +        //  Cancel the timer identified by the handle. +        void rm_timer (handle_t handle_);      protected: @@ -100,7 +99,7 @@ namespace xs          struct timer_info_t          {              xs::i_poll_events *sink; -            int id; +            std::multimap <uint64_t, timer_info_t>::iterator self;          };          typedef std::multimap <uint64_t, timer_info_t> timers_t;          timers_t timers; diff --git a/src/reaper.cpp b/src/reaper.cpp index c9a5961..8dc2da6 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -78,7 +78,7 @@ void xs::reaper_t::out_event (fd_t fd_)      xs_assert (false);  } -void xs::reaper_t::timer_event (int id_) +void xs::reaper_t::timer_event (handle_t handle_)  {      xs_assert (false);  } diff --git a/src/reaper.hpp b/src/reaper.hpp index c76970e..bc40fac 100644 --- a/src/reaper.hpp +++ b/src/reaper.hpp @@ -46,7 +46,7 @@ namespace xs          //  i_poll_events implementation.          void in_event (fd_t fd_);          void out_event (fd_t fd_); -        void timer_event (int id_); +        void timer_event (handle_t handle_);      private: diff --git a/src/session_base.cpp b/src/session_base.cpp index 2bd5918..acb13b3 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -112,9 +112,9 @@ xs::session_base_t::session_base_t (class io_thread_t *io_thread_,      engine (NULL),      socket (socket_),      io_thread (io_thread_), -    has_linger_timer (false),      send_identity (options_.send_identity), -    recv_identity (options_.recv_identity) +    recv_identity (options_.recv_identity), +    linger_timer (NULL)  {      if (protocol_)          protocol = protocol_; @@ -127,9 +127,9 @@ xs::session_base_t::~session_base_t ()      xs_assert (!pipe);      //  If there's still a pending linger timer, remove it. -    if (has_linger_timer) { -        rm_timer (linger_timer_id); -        has_linger_timer = false; +    if (linger_timer) { +        rm_timer (linger_timer); +        linger_timer = NULL;      }      //  Close the engine. @@ -329,9 +329,8 @@ void xs::session_base_t::process_term (int linger_)      //  If linger is infinite (negative) we don't even have to set      //  the timer.      if (linger_ > 0) { -        xs_assert (!has_linger_timer); -        add_timer (linger_, linger_timer_id); -        has_linger_timer = true; +        xs_assert (!linger_timer); +        linger_timer = add_timer (linger_);      }      //  Start pipe termination process. Delay the termination till all messages @@ -353,12 +352,11 @@ void xs::session_base_t::proceed_with_term ()      own_t::process_term (0);  } -void xs::session_base_t::timer_event (int id_) +void xs::session_base_t::timer_event (handle_t handle_)  {      //  Linger period expired. We can proceed with termination even though      //  there are still pending messages to be sent. -    xs_assert (id_ == linger_timer_id); -    has_linger_timer = false; +    xs_assert (handle_ == linger_timer);      //  Ask pipe to terminate even though there may be pending messages in it.      xs_assert (pipe); diff --git a/src/session_base.hpp b/src/session_base.hpp index 2360b6b..45ad8f8 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -84,7 +84,7 @@ namespace xs          void process_term (int linger_);          //  i_poll_events handlers. -        void timer_event (int id_); +        void timer_event (handle_t handle_);          //  Remove any half processed messages. Flush unflushed messages.          //  Call this function when engine disconnect to get rid of leftovers. @@ -118,12 +118,6 @@ namespace xs          //  the engines into the same thread.          xs::io_thread_t *io_thread; -        //  ID of the linger timer -        enum {linger_timer_id = 0x20}; - -        //  True is linger timer is running. -        bool has_linger_timer; -          //  If true, identity is to be sent/recvd from the network.          bool send_identity;          bool recv_identity; @@ -132,6 +126,9 @@ namespace xs          std::string protocol;          std::string address; +        //  Handle of the linger timer, if active, NULL otherwise. +        handle_t linger_timer; +          session_base_t (const session_base_t&);          const session_base_t &operator = (const session_base_t&);      }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 94b555b..a321a71 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -807,7 +807,7 @@ void xs::socket_base_t::out_event (fd_t fd_)      xs_assert (false);  } -void xs::socket_base_t::timer_event (int id_) +void xs::socket_base_t::timer_event (handle_t handle_)  {      xs_assert (false);  } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 6aafeb2..01a1564 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -87,7 +87,7 @@ namespace xs          //  is handled by the poller in the reaper thread.          void in_event (fd_t fd_);          void out_event (fd_t fd_); -        void timer_event (int id_); +        void timer_event (handle_t handle_);          //  i_pipe_events interface implementation.          void read_activated (pipe_t *pipe_); diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index fbaaf8e..2b4f657 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -55,7 +55,8 @@ xs::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,      handle_valid (false),      wait (wait_),      session (session_), -    current_reconnect_ivl(options.reconnect_ivl) +    current_reconnect_ivl(options.reconnect_ivl), +    reconnect_timer (NULL)  {      //  TODO: set_addess should be called separately, so that the error      //  can be propagated. @@ -65,8 +66,11 @@ xs::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,  xs::tcp_connecter_t::~tcp_connecter_t ()  { -    if (wait) -        rm_timer (reconnect_timer_id); +    if (wait) { +        xs_assert (reconnect_timer); +        rm_timer (reconnect_timer); +        reconnect_timer = NULL; +    }      if (handle_valid)          rm_fd (handle); @@ -117,9 +121,10 @@ void xs::tcp_connecter_t::out_event (fd_t fd_)      terminate ();  } -void xs::tcp_connecter_t::timer_event (int id_) +void xs::tcp_connecter_t::timer_event (handle_t handle_)  { -    xs_assert (id_ == reconnect_timer_id); +    xs_assert (handle_ == reconnect_timer); +    reconnect_timer = NULL;      wait = false;      start_connecting ();  } @@ -153,7 +158,8 @@ void xs::tcp_connecter_t::start_connecting ()  void xs::tcp_connecter_t::add_reconnect_timer()  { -    add_timer (get_new_reconnect_ivl(), reconnect_timer_id); +    xs_assert (!reconnect_timer); +    reconnect_timer = add_timer (get_new_reconnect_ivl());  }  int xs::tcp_connecter_t::get_new_reconnect_ivl () diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 5791422..b86c2d5 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -47,16 +47,13 @@ namespace xs      private: -        //  ID of the timer used to delay the reconnection. -        enum {reconnect_timer_id = 1}; -          //  Handlers for incoming commands.          void process_plug ();          //  Handlers for I/O events.          void in_event (fd_t fd_);          void out_event (fd_t fd_); -        void timer_event (int id_); +        void timer_event (handle_t handle_);          //  Internal function to start the actual connection establishment.          void start_connecting (); @@ -109,6 +106,9 @@ namespace xs          //  Current reconnect ivl, updated for backoff strategy          int current_reconnect_ivl; +        //  Handle to reconnect timer, if active, NULL otherwise. +        handle_t reconnect_timer; +          tcp_connecter_t (const tcp_connecter_t&);          const tcp_connecter_t &operator = (const tcp_connecter_t&);      };  | 
