diff options
Diffstat (limited to 'src/pgm_receiver.cpp')
-rw-r--r-- | src/pgm_receiver.cpp | 21 |
1 files changed, 20 insertions, 1 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index ceae0da..01b95b2 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -135,6 +135,11 @@ void zmq::pgm_receiver_t::in_event () zmq_assert (pending_bytes == 0); + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + // TODO: This loop can effectively block other engines in the same I/O // thread in the case of high load. while (true) { @@ -144,8 +149,15 @@ void zmq::pgm_receiver_t::in_event () // No data to process. This may happen if the packet received is // neither ODATA nor ODATA. - if (received < 0) + if (received == 0) { + const int last_errno = errno; + if (last_errno == ENOMEM || last_errno == EBUSY) { + const long timeout = pgm_socket.get_rx_timeout (); + add_timer (timeout, rx_timer_id); + has_rx_timer = true; + } break; + } // Find the peer based on its TSI. peers_t::iterator it = peers.find (*tsi); @@ -219,5 +231,12 @@ void zmq::pgm_receiver_t::in_event () inout->flush (); } +void zmq::pgm_receiver_t::timer_event (int token) +{ + zmq_assert (token == rx_timer_id); + + in_event (); +} + #endif |