summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-10-15 10:58:19 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-10-15 10:58:19 +0200
commitb64b50ae218dbbc362eaeb0571a337650f623e3c (patch)
tree38247a0addc1da446a54f6621a7aab68227514c1 /src
parente288f7a347eb5a2fd38043d930fc67c8e8bcce9b (diff)
Timers correctly canceled by PGM engines on shutdown.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/pgm_receiver.cpp10
-rw-r--r--src/pgm_sender.cpp17
-rw-r--r--src/pgm_sender.hpp3
3 files changed, 23 insertions, 7 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 30a28ce..c1b35f1 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -36,6 +36,7 @@
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
+ has_rx_timer (false),
pgm_socket (true, options_),
options (options_),
inout (NULL),
@@ -81,7 +82,11 @@ void zmq::pgm_receiver_t::unplug ()
mru_decoder = NULL;
pending_bytes = 0;
- // Stop polling.
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
rm_fd (socket_handle);
rm_fd (pipe_handle);
@@ -150,8 +155,7 @@ void zmq::pgm_receiver_t::in_event ()
// No data to process. This may happen if the packet received is
// neither ODATA nor ODATA.
if (received == 0) {
- const int last_errno = errno;
- if (last_errno == ENOMEM || last_errno == EBUSY) {
+ if (errno == ENOMEM || errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout ();
add_timer (timeout, rx_timer_id);
has_rx_timer = true;
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 3fc0d90..55a8d4e 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -36,6 +36,8 @@
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
+ has_tx_timer (false),
+ has_rx_timer (false),
encoder (0),
pgm_socket (false, options_),
options (options_),
@@ -89,6 +91,16 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)
void zmq::pgm_sender_t::unplug ()
{
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
+ if (has_tx_timer) {
+ cancel_timer (tx_timer_id);
+ has_tx_timer = false;
+ }
+
rm_fd (handle);
rm_fd (uplink_handle);
rm_fd (rdata_notify_handle);
@@ -128,10 +140,9 @@ void zmq::pgm_sender_t::in_event ()
has_rx_timer = false;
}
- // In event on sender side means NAK or SPMR receiving from some peer.
+ // In-event on sender side means NAK or SPMR receiving from some peer.
pgm_socket.process_upstream ();
- const int last_errno = errno;
- if (last_errno == ENOMEM || last_errno == EBUSY) {
+ if (errno == ENOMEM || errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout ();
add_timer (timeout, rx_timer_id);
has_rx_timer = true;
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 6d3ae31..9270ba0 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -66,7 +66,8 @@ namespace zmq
enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1};
// Timers are running.
- bool has_tx_timer, has_rx_timer;
+ bool has_tx_timer;
+ bool has_rx_timer;
// Message encoder.
encoder_t encoder;