summaryrefslogtreecommitdiff
path: root/src/pgm_receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pgm_receiver.cpp')
-rw-r--r--src/pgm_receiver.cpp21
1 files changed, 20 insertions, 1 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index ceae0da..01b95b2 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -135,6 +135,11 @@ void zmq::pgm_receiver_t::in_event ()
zmq_assert (pending_bytes == 0);
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// TODO: This loop can effectively block other engines in the same I/O
// thread in the case of high load.
while (true) {
@@ -144,8 +149,15 @@ void zmq::pgm_receiver_t::in_event ()
// No data to process. This may happen if the packet received is
// neither ODATA nor ODATA.
- if (received < 0)
+ if (received == 0) {
+ const int last_errno = errno;
+ if (last_errno == ENOMEM || last_errno == EBUSY) {
+ const long timeout = pgm_socket.get_rx_timeout ();
+ add_timer (timeout, rx_timer_id);
+ has_rx_timer = true;
+ }
break;
+ }
// Find the peer based on its TSI.
peers_t::iterator it = peers.find (*tsi);
@@ -219,5 +231,12 @@ void zmq::pgm_receiver_t::in_event ()
inout->flush ();
}
+void zmq::pgm_receiver_t::timer_event (int token)
+{
+ zmq_assert (token == rx_timer_id);
+
+ in_event ();
+}
+
#endif