From d14be62499478f31cf641399982ecdf4f6f158c4 Mon Sep 17 00:00:00 2001 From: Steven McCoy Date: Tue, 28 Sep 2010 22:46:56 +0200 Subject: more fixes to (e)pgm transport --- src/pgm_sender.cpp | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) (limited to 'src/pgm_sender.cpp') diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 5c9020d..82a37ab 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -123,8 +123,19 @@ zmq::pgm_sender_t::~pgm_sender_t () void zmq::pgm_sender_t::in_event () { + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + // In event on sender side means NAK or SPMR receiving from some peer. pgm_socket.process_upstream (); + const int last_errno = errno; + if (last_errno == ENOMEM || last_errno == EBUSY) { + const long timeout = pgm_socket.get_rx_timeout (); + add_timer (timeout, rx_timer_id); + has_rx_timer = true; + } } void zmq::pgm_sender_t::out_event () @@ -152,14 +163,36 @@ void zmq::pgm_sender_t::out_event () put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); } + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + // Send the data. size_t nbytes = pgm_socket.send (out_buffer, write_size); // We can write either all data or 0 which means rate limit reached. - if (nbytes == write_size) + if (nbytes == write_size) { write_size = 0; - else + } else { zmq_assert (nbytes == 0); + + if (errno == ENOMEM) { + const long timeout = pgm_socket.get_tx_timeout (); + add_timer (timeout, tx_timer_id); + has_tx_timer = true; + } else + zmq_assert (errno == EBUSY); + } +} + +void zmq::pgm_sender_t::timer_event (int token) +{ + if (token == rx_timer_id) + in_event (); + + zmq_assert (token == tx_timer_id); + out_event (); } #endif -- cgit v1.2.3