summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-09-26 18:30:03 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-09-26 18:30:03 +0200
commitcf815e8c785254d97190f223765fbbd19a1e6d52 (patch)
tree5b76f6ad2d5e71eab8491957321834b7b5183e7b
parentbe79a9fbc26a55c90b693485f69dfd17f710cb13 (diff)
new interfaces for timers; the implementation is not changed yet
-rw-r--r--src/config.hpp4
-rw-r--r--src/devpoll.cpp6
-rw-r--r--src/devpoll.hpp7
-rw-r--r--src/epoll.cpp6
-rw-r--r--src/epoll.hpp4
-rw-r--r--src/i_poll_events.hpp2
-rw-r--r--src/io_object.cpp10
-rw-r--r--src/io_object.hpp6
-rw-r--r--src/io_thread.cpp2
-rw-r--r--src/io_thread.hpp2
-rw-r--r--src/kqueue.cpp6
-rw-r--r--src/kqueue.hpp4
-rw-r--r--src/poll.cpp6
-rw-r--r--src/poll.hpp4
-rw-r--r--src/select.cpp6
-rw-r--r--src/select.hpp4
-rw-r--r--src/zmq_connecter.cpp8
-rw-r--r--src/zmq_connecter.hpp3
18 files changed, 48 insertions, 42 deletions
diff --git a/src/config.hpp b/src/config.hpp
index 81b5a2a..bceb41c 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -69,9 +69,13 @@ namespace zmq
// Maximum number of events the I/O thread can process in one go.
max_io_events = 256,
+// TODO: To be removed
// Maximal wait time for a timer (milliseconds).
max_timer_period = 100,
+ // How long to wait (milliseconds) till reattempting to connect.
+ reconnect_period = 100,
+
// Maximal delay to process command in API thread (in CPU ticks).
// 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs.
// Note that delay is only applied when there is continuous stream of
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index 003f465..7054c2b 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -128,12 +128,12 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_)
devpoll_ctl (handle_, fd_table [handle_].events);
}
-void zmq::devpoll_t::add_timer (i_poll_events *events_)
+void zmq::devpoll_t::add_timer (int timeout_, i_poll_events *events_, int id_)
{
timers.push_back (events_);
}
-void zmq::devpoll_t::cancel_timer (i_poll_events *events_)
+void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_)
{
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it != timers.end ())
@@ -190,7 +190,7 @@ void zmq::devpoll_t::loop ()
// Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
- (*it)->timer_event ();
+ (*it)->timer_event (-1);
continue;
}
diff --git a/src/devpoll.hpp b/src/devpoll.hpp
index 019d268..00be385 100644
--- a/src/devpoll.hpp
+++ b/src/devpoll.hpp
@@ -33,8 +33,7 @@
namespace zmq
{
- // Implements socket polling mechanism using the Solaris-specific
- // "/dev/poll" interface.
+ // Implements socket polling mechanism using the "/dev/poll" interface.
class devpoll_t
{
@@ -52,8 +51,8 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (struct i_poll_events *events_);
- void cancel_timer (struct i_poll_events *events_);
+ void add_timer (int timeout_, struct i_poll_events *events_, int id_);
+ void cancel_timer (struct i_poll_events *events_, int id_);
int get_load ();
void start ();
void stop ();
diff --git a/src/epoll.cpp b/src/epoll.cpp
index e22eb8c..bbad639 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -120,12 +120,12 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)
errno_assert (rc != -1);
}
-void zmq::epoll_t::add_timer (i_poll_events *events_)
+void zmq::epoll_t::add_timer (int timeout_, i_poll_events *events_, int id_)
{
timers.push_back (events_);
}
-void zmq::epoll_t::cancel_timer (i_poll_events *events_)
+void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_)
{
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it == timers.end ())
@@ -175,7 +175,7 @@ void zmq::epoll_t::loop ()
// Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
- (*it)->timer_event ();
+ (*it)->timer_event (-1);
continue;
}
diff --git a/src/epoll.hpp b/src/epoll.hpp
index 38175cb..a68f055 100644
--- a/src/epoll.hpp
+++ b/src/epoll.hpp
@@ -53,8 +53,8 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (struct i_poll_events *events_);
- void cancel_timer (struct i_poll_events *events_);
+ void add_timer (int timeout_, struct i_poll_events *events_, int id_);
+ void cancel_timer (struct i_poll_events *events_, int id_);
int get_load ();
void start ();
void stop ();
diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp
index 6d474b2..8e70921 100644
--- a/src/i_poll_events.hpp
+++ b/src/i_poll_events.hpp
@@ -37,7 +37,7 @@ namespace zmq
virtual void out_event () = 0;
// Called when timer expires.
- virtual void timer_event () = 0;
+ virtual void timer_event (int id_) = 0;
};
}
diff --git a/src/io_object.cpp b/src/io_object.cpp
index b3b45ee..d2620a6 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -80,14 +80,14 @@ void zmq::io_object_t::reset_pollout (handle_t handle_)
poller->reset_pollout (handle_);
}
-void zmq::io_object_t::add_timer ()
+void zmq::io_object_t::add_timer (int timeout_, int id_)
{
- poller->add_timer (this);
+ poller->add_timer (timeout_, this, id_);
}
-void zmq::io_object_t::cancel_timer ()
+void zmq::io_object_t::cancel_timer (int id_)
{
- poller->cancel_timer (this);
+ poller->cancel_timer (this, id_);
}
void zmq::io_object_t::in_event ()
@@ -100,7 +100,7 @@ void zmq::io_object_t::out_event ()
zmq_assert (false);
}
-void zmq::io_object_t::timer_event ()
+void zmq::io_object_t::timer_event (int id_)
{
zmq_assert (false);
}
diff --git a/src/io_object.hpp b/src/io_object.hpp
index 284e6d1..ba69acc 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -56,13 +56,13 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer ();
- void cancel_timer ();
+ void add_timer (int timout_, int id_);
+ void cancel_timer (int id_);
// i_poll_events interface implementation.
void in_event ();
void out_event ();
- void timer_event ();
+ void timer_event (int id_);
private:
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index d1d95f3..05a5eb2 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -89,7 +89,7 @@ void zmq::io_thread_t::out_event ()
zmq_assert (false);
}
-void zmq::io_thread_t::timer_event ()
+void zmq::io_thread_t::timer_event (int id_)
{
// No timers here. This function is never called.
zmq_assert (false);
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index 9e7c2ea..20d4ae3 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -56,7 +56,7 @@ namespace zmq
// i_poll_events implementation.
void in_event ();
void out_event ();
- void timer_event ();
+ void timer_event (int id_);
// Used by io_objects to retrieve the assciated poller object.
poller_t *get_poller ();
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index e1fe2fa..f76a08f 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -132,12 +132,12 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)
kevent_delete (pe->fd, EVFILT_WRITE);
}
-void zmq::kqueue_t::add_timer (i_poll_events *events_)
+void zmq::kqueue_t::add_timer (int timeout_, i_poll_events *events_, int id_)
{
timers.push_back (events_);
}
-void zmq::kqueue_t::cancel_timer (i_poll_events *events_)
+void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_)
{
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it != timers.end ())
@@ -186,7 +186,7 @@ void zmq::kqueue_t::loop ()
// Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
- (*it)->timer_event ();
+ (*it)->timer_event (-1);
continue;
}
diff --git a/src/kqueue.hpp b/src/kqueue.hpp
index ac28a7d..43c2a39 100644
--- a/src/kqueue.hpp
+++ b/src/kqueue.hpp
@@ -53,8 +53,8 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (struct i_poll_events *events_);
- void cancel_timer (struct i_poll_events *events_);
+ void add_timer (int timeout_, struct i_poll_events *events_, int id_);
+ void cancel_timer (struct i_poll_events *events_, int id_);
int get_load ();
void start ();
void stop ();
diff --git a/src/poll.cpp b/src/poll.cpp
index 1b203db..513c405 100644
--- a/src/poll.cpp
+++ b/src/poll.cpp
@@ -112,12 +112,12 @@ void zmq::poll_t::reset_pollout (handle_t handle_)
pollset [index].events &= ~((short) POLLOUT);
}
-void zmq::poll_t::add_timer (i_poll_events *events_)
+void zmq::poll_t::add_timer (int timeout_, i_poll_events *events_, int id_)
{
timers.push_back (events_);
}
-void zmq::poll_t::cancel_timer (i_poll_events *events_)
+void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_)
{
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it != timers.end ())
@@ -160,7 +160,7 @@ void zmq::poll_t::loop ()
// Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
- (*it)->timer_event ();
+ (*it)->timer_event (-1);
continue;
}
diff --git a/src/poll.hpp b/src/poll.hpp
index f4ae35a..96d18dd 100644
--- a/src/poll.hpp
+++ b/src/poll.hpp
@@ -58,8 +58,8 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (struct i_poll_events *events_);
- void cancel_timer (struct i_poll_events *events_);
+ void add_timer (int timeout_, struct i_poll_events *events_, int id_);
+ void cancel_timer (struct i_poll_events *events_, int id_);
int get_load ();
void start ();
void stop ();
diff --git a/src/select.cpp b/src/select.cpp
index 59eb83e..f169239 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -136,12 +136,12 @@ void zmq::select_t::reset_pollout (handle_t handle_)
FD_CLR (handle_, &source_set_out);
}
-void zmq::select_t::add_timer (i_poll_events *events_)
+void zmq::select_t::add_timer (int timeout_, i_poll_events *events_, int id_)
{
timers.push_back (events_);
}
-void zmq::select_t::cancel_timer (i_poll_events *events_)
+void zmq::select_t::cancel_timer (i_poll_events *events_, int id_)
{
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it != timers.end ())
@@ -199,7 +199,7 @@ void zmq::select_t::loop ()
// Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
- (*it)->timer_event ();
+ (*it)->timer_event (-1);
continue;
}
diff --git a/src/select.hpp b/src/select.hpp
index 01e9fa8..ae19fe1 100644
--- a/src/select.hpp
+++ b/src/select.hpp
@@ -60,8 +60,8 @@ namespace zmq
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (struct i_poll_events *events_);
- void cancel_timer (struct i_poll_events *events_);
+ void add_timer (int timeout_, struct i_poll_events *events_, int id_);
+ void cancel_timer (struct i_poll_events *events_, int id_);
int get_load ();
void start ();
void stop ();
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index cfca875..7cb58d3 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -42,7 +42,7 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
zmq::zmq_connecter_t::~zmq_connecter_t ()
{
if (wait)
- cancel_timer ();
+ cancel_timer (reconnect_timer_id);
if (handle_valid)
rm_fd (handle);
}
@@ -50,7 +50,7 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
void zmq::zmq_connecter_t::process_plug ()
{
if (wait)
- add_timer ();
+ add_timer (reconnect_period, reconnect_timer_id);
else
start_connecting ();
}
@@ -73,7 +73,7 @@ void zmq::zmq_connecter_t::out_event ()
if (fd == retired_fd) {
tcp_connecter.close ();
wait = true;
- add_timer ();
+ add_timer (reconnect_period, reconnect_timer_id);
return;
}
@@ -121,5 +121,5 @@ void zmq::zmq_connecter_t::start_connecting ()
// Handle any other error condition by eventual reconnect.
wait = true;
- add_timer ();
+ add_timer (reconnect_period, reconnect_timer_id);
}
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index f1a4c30..381e020 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -42,6 +42,9 @@ namespace zmq
private:
+ // ID of the timer used to delay the reconnection.
+ enum {reconnect_timer_id = 1};
+
// Handlers for incoming commands.
void process_plug ();