diff options
| -rw-r--r-- | src/config.hpp | 4 | ||||
| -rw-r--r-- | src/devpoll.cpp | 6 | ||||
| -rw-r--r-- | src/devpoll.hpp | 7 | ||||
| -rw-r--r-- | src/epoll.cpp | 6 | ||||
| -rw-r--r-- | src/epoll.hpp | 4 | ||||
| -rw-r--r-- | src/i_poll_events.hpp | 2 | ||||
| -rw-r--r-- | src/io_object.cpp | 10 | ||||
| -rw-r--r-- | src/io_object.hpp | 6 | ||||
| -rw-r--r-- | src/io_thread.cpp | 2 | ||||
| -rw-r--r-- | src/io_thread.hpp | 2 | ||||
| -rw-r--r-- | src/kqueue.cpp | 6 | ||||
| -rw-r--r-- | src/kqueue.hpp | 4 | ||||
| -rw-r--r-- | src/poll.cpp | 6 | ||||
| -rw-r--r-- | src/poll.hpp | 4 | ||||
| -rw-r--r-- | src/select.cpp | 6 | ||||
| -rw-r--r-- | src/select.hpp | 4 | ||||
| -rw-r--r-- | src/zmq_connecter.cpp | 8 | ||||
| -rw-r--r-- | src/zmq_connecter.hpp | 3 | 
18 files changed, 48 insertions, 42 deletions
diff --git a/src/config.hpp b/src/config.hpp index 81b5a2a..bceb41c 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -69,9 +69,13 @@ namespace zmq          //  Maximum number of events the I/O thread can process in one go.          max_io_events = 256, +//  TODO: To be removed          //  Maximal wait time for a timer (milliseconds).          max_timer_period = 100, +        //  How long to wait (milliseconds) till reattempting to connect. +        reconnect_period = 100, +          //  Maximal delay to process command in API thread (in CPU ticks).          //  3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs.          //  Note that delay is only applied when there is continuous stream of diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 003f465..7054c2b 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -128,12 +128,12 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_)      devpoll_ctl (handle_, fd_table [handle_].events);  } -void zmq::devpoll_t::add_timer (i_poll_events *events_) +void zmq::devpoll_t::add_timer (int timeout_, i_poll_events *events_, int id_)  {       timers.push_back (events_);  } -void zmq::devpoll_t::cancel_timer (i_poll_events *events_) +void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_)  {      timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);      if (it != timers.end ()) @@ -190,7 +190,7 @@ void zmq::devpoll_t::loop ()              //  Trigger all the timers.              for (timers_t::iterator it = t.begin (); it != t.end (); it ++) -                (*it)->timer_event (); +                (*it)->timer_event (-1);              continue;          } diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 019d268..00be385 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -33,8 +33,7 @@  namespace zmq  { -    //  Implements socket polling mechanism using the Solaris-specific -    //  "/dev/poll" interface. +    //  Implements socket polling mechanism using the "/dev/poll" interface.      class devpoll_t      { @@ -52,8 +51,8 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (struct i_poll_events *events_); -        void cancel_timer (struct i_poll_events *events_); +        void add_timer (int timeout_, struct i_poll_events *events_, int id_); +        void cancel_timer (struct i_poll_events *events_, int id_);          int get_load ();          void start ();          void stop (); diff --git a/src/epoll.cpp b/src/epoll.cpp index e22eb8c..bbad639 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -120,12 +120,12 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)      errno_assert (rc != -1);  } -void zmq::epoll_t::add_timer (i_poll_events *events_) +void zmq::epoll_t::add_timer (int timeout_, i_poll_events *events_, int id_)  {       timers.push_back (events_);  } -void zmq::epoll_t::cancel_timer (i_poll_events *events_) +void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_)  {      timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);      if (it == timers.end ()) @@ -175,7 +175,7 @@ void zmq::epoll_t::loop ()              //  Trigger all the timers.              for (timers_t::iterator it = t.begin (); it != t.end (); it ++) -                (*it)->timer_event (); +                (*it)->timer_event (-1);              continue;          } diff --git a/src/epoll.hpp b/src/epoll.hpp index 38175cb..a68f055 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -53,8 +53,8 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (struct i_poll_events *events_); -        void cancel_timer (struct i_poll_events *events_); +        void add_timer (int timeout_, struct i_poll_events *events_, int id_); +        void cancel_timer (struct i_poll_events *events_, int id_);          int get_load ();          void start ();          void stop (); diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp index 6d474b2..8e70921 100644 --- a/src/i_poll_events.hpp +++ b/src/i_poll_events.hpp @@ -37,7 +37,7 @@ namespace zmq          virtual void out_event () = 0;          // Called when timer expires. -        virtual void timer_event () = 0; +        virtual void timer_event (int id_) = 0;      };  } diff --git a/src/io_object.cpp b/src/io_object.cpp index b3b45ee..d2620a6 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -80,14 +80,14 @@ void zmq::io_object_t::reset_pollout (handle_t handle_)      poller->reset_pollout (handle_);  } -void zmq::io_object_t::add_timer () +void zmq::io_object_t::add_timer (int timeout_, int id_)  { -    poller->add_timer (this); +    poller->add_timer (timeout_, this, id_);  } -void zmq::io_object_t::cancel_timer () +void zmq::io_object_t::cancel_timer (int id_)  { -    poller->cancel_timer (this); +    poller->cancel_timer (this, id_);  }  void zmq::io_object_t::in_event () @@ -100,7 +100,7 @@ void zmq::io_object_t::out_event ()      zmq_assert (false);  } -void zmq::io_object_t::timer_event () +void zmq::io_object_t::timer_event (int id_)  {      zmq_assert (false);  } diff --git a/src/io_object.hpp b/src/io_object.hpp index 284e6d1..ba69acc 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -56,13 +56,13 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (); -        void cancel_timer (); +        void add_timer (int timout_, int id_); +        void cancel_timer (int id_);          //  i_poll_events interface implementation.          void in_event ();          void out_event (); -        void timer_event (); +        void timer_event (int id_);      private: diff --git a/src/io_thread.cpp b/src/io_thread.cpp index d1d95f3..05a5eb2 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -89,7 +89,7 @@ void zmq::io_thread_t::out_event ()      zmq_assert (false);  } -void zmq::io_thread_t::timer_event () +void zmq::io_thread_t::timer_event (int id_)  {      //  No timers here. This function is never called.      zmq_assert (false); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 9e7c2ea..20d4ae3 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -56,7 +56,7 @@ namespace zmq          //  i_poll_events implementation.          void in_event ();          void out_event (); -        void timer_event (); +        void timer_event (int id_);          //  Used by io_objects to retrieve the assciated poller object.          poller_t *get_poller (); diff --git a/src/kqueue.cpp b/src/kqueue.cpp index e1fe2fa..f76a08f 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -132,12 +132,12 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)      kevent_delete (pe->fd, EVFILT_WRITE);  } -void zmq::kqueue_t::add_timer (i_poll_events *events_) +void zmq::kqueue_t::add_timer (int timeout_, i_poll_events *events_, int id_)  {       timers.push_back (events_);  } -void zmq::kqueue_t::cancel_timer (i_poll_events *events_) +void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_)  {      timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);      if (it != timers.end ()) @@ -186,7 +186,7 @@ void zmq::kqueue_t::loop ()              //  Trigger all the timers.              for (timers_t::iterator it = t.begin (); it != t.end (); it ++) -                (*it)->timer_event (); +                (*it)->timer_event (-1);              continue;          } diff --git a/src/kqueue.hpp b/src/kqueue.hpp index ac28a7d..43c2a39 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -53,8 +53,8 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (struct i_poll_events *events_); -        void cancel_timer (struct i_poll_events *events_); +        void add_timer (int timeout_, struct i_poll_events *events_, int id_); +        void cancel_timer (struct i_poll_events *events_, int id_);          int get_load ();          void start ();          void stop (); diff --git a/src/poll.cpp b/src/poll.cpp index 1b203db..513c405 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -112,12 +112,12 @@ void zmq::poll_t::reset_pollout (handle_t handle_)      pollset [index].events &= ~((short) POLLOUT);  } -void zmq::poll_t::add_timer (i_poll_events *events_) +void zmq::poll_t::add_timer (int timeout_, i_poll_events *events_, int id_)  {       timers.push_back (events_);  } -void zmq::poll_t::cancel_timer (i_poll_events *events_) +void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_)  {      timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);      if (it != timers.end ()) @@ -160,7 +160,7 @@ void zmq::poll_t::loop ()              //  Trigger all the timers.              for (timers_t::iterator it = t.begin (); it != t.end (); it ++) -                (*it)->timer_event (); +                (*it)->timer_event (-1);              continue;          } diff --git a/src/poll.hpp b/src/poll.hpp index f4ae35a..96d18dd 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -58,8 +58,8 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (struct i_poll_events *events_); -        void cancel_timer (struct i_poll_events *events_); +        void add_timer (int timeout_, struct i_poll_events *events_, int id_); +        void cancel_timer (struct i_poll_events *events_, int id_);          int get_load ();          void start ();          void stop (); diff --git a/src/select.cpp b/src/select.cpp index 59eb83e..f169239 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -136,12 +136,12 @@ void zmq::select_t::reset_pollout (handle_t handle_)      FD_CLR (handle_, &source_set_out);  } -void zmq::select_t::add_timer (i_poll_events *events_) +void zmq::select_t::add_timer (int timeout_, i_poll_events *events_, int id_)  {      timers.push_back (events_);  } -void zmq::select_t::cancel_timer (i_poll_events *events_) +void zmq::select_t::cancel_timer (i_poll_events *events_, int id_)  {      timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);      if (it != timers.end ()) @@ -199,7 +199,7 @@ void zmq::select_t::loop ()              //  Trigger all the timers.              for (timers_t::iterator it = t.begin (); it != t.end (); it ++) -                (*it)->timer_event (); +                (*it)->timer_event (-1);              continue;          } diff --git a/src/select.hpp b/src/select.hpp index 01e9fa8..ae19fe1 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -60,8 +60,8 @@ namespace zmq          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (struct i_poll_events *events_); -        void cancel_timer (struct i_poll_events *events_); +        void add_timer (int timeout_, struct i_poll_events *events_, int id_); +        void cancel_timer (struct i_poll_events *events_, int id_);          int get_load ();          void start ();          void stop (); diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index cfca875..7cb58d3 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -42,7 +42,7 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,  zmq::zmq_connecter_t::~zmq_connecter_t ()  {      if (wait) -        cancel_timer (); +        cancel_timer (reconnect_timer_id);      if (handle_valid)          rm_fd (handle);  } @@ -50,7 +50,7 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()  void zmq::zmq_connecter_t::process_plug ()  {      if (wait) -        add_timer (); +        add_timer (reconnect_period, reconnect_timer_id);      else          start_connecting ();  } @@ -73,7 +73,7 @@ void zmq::zmq_connecter_t::out_event ()      if (fd == retired_fd) {          tcp_connecter.close ();          wait = true; -        add_timer (); +        add_timer (reconnect_period, reconnect_timer_id);          return;      } @@ -121,5 +121,5 @@ void zmq::zmq_connecter_t::start_connecting ()      //  Handle any other error condition by eventual reconnect.      wait = true; -    add_timer (); +    add_timer (reconnect_period, reconnect_timer_id);  } diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index f1a4c30..381e020 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -42,6 +42,9 @@ namespace zmq      private: +        //  ID of the timer used to delay the reconnection. +        enum {reconnect_timer_id = 1}; +          //  Handlers for incoming commands.          void process_plug ();  | 
