summaryrefslogtreecommitdiff
path: root/src/pgm_sender.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pgm_sender.cpp')
-rw-r--r--src/pgm_sender.cpp37
1 files changed, 35 insertions, 2 deletions
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 5c9020d..82a37ab 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -123,8 +123,19 @@ zmq::pgm_sender_t::~pgm_sender_t ()
void zmq::pgm_sender_t::in_event ()
{
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// In event on sender side means NAK or SPMR receiving from some peer.
pgm_socket.process_upstream ();
+ const int last_errno = errno;
+ if (last_errno == ENOMEM || last_errno == EBUSY) {
+ const long timeout = pgm_socket.get_rx_timeout ();
+ add_timer (timeout, rx_timer_id);
+ has_rx_timer = true;
+ }
}
void zmq::pgm_sender_t::out_event ()
@@ -152,14 +163,36 @@ void zmq::pgm_sender_t::out_event ()
put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
}
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// Send the data.
size_t nbytes = pgm_socket.send (out_buffer, write_size);
// We can write either all data or 0 which means rate limit reached.
- if (nbytes == write_size)
+ if (nbytes == write_size) {
write_size = 0;
- else
+ } else {
zmq_assert (nbytes == 0);
+
+ if (errno == ENOMEM) {
+ const long timeout = pgm_socket.get_tx_timeout ();
+ add_timer (timeout, tx_timer_id);
+ has_tx_timer = true;
+ } else
+ zmq_assert (errno == EBUSY);
+ }
+}
+
+void zmq::pgm_sender_t::timer_event (int token)
+{
+ if (token == rx_timer_id)
+ in_event ();
+
+ zmq_assert (token == tx_timer_id);
+ out_event ();
}
#endif