From d14be62499478f31cf641399982ecdf4f6f158c4 Mon Sep 17 00:00:00 2001 From: Steven McCoy Date: Tue, 28 Sep 2010 22:46:56 +0200 Subject: more fixes to (e)pgm transport --- src/pgm_receiver.cpp | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) (limited to 'src/pgm_receiver.cpp') diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index ceae0da..01b95b2 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -135,6 +135,11 @@ void zmq::pgm_receiver_t::in_event () zmq_assert (pending_bytes == 0); + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + // TODO: This loop can effectively block other engines in the same I/O // thread in the case of high load. while (true) { @@ -144,8 +149,15 @@ void zmq::pgm_receiver_t::in_event () // No data to process. This may happen if the packet received is // neither ODATA nor ODATA. - if (received < 0) + if (received == 0) { + const int last_errno = errno; + if (last_errno == ENOMEM || last_errno == EBUSY) { + const long timeout = pgm_socket.get_rx_timeout (); + add_timer (timeout, rx_timer_id); + has_rx_timer = true; + } break; + } // Find the peer based on its TSI. peers_t::iterator it = peers.find (*tsi); @@ -219,5 +231,12 @@ void zmq::pgm_receiver_t::in_event () inout->flush (); } +void zmq::pgm_receiver_t::timer_event (int token) +{ + zmq_assert (token == rx_timer_id); + + in_event (); +} + #endif -- cgit v1.2.3