summaryrefslogtreecommitdiff
path: root/src/pgm_sender.cpp
diff options
context:
space:
mode:
authorSteven McCoy <steven.mccoy@miru.hk>2010-09-28 22:46:56 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-09-30 09:11:51 +0200
commitd14be62499478f31cf641399982ecdf4f6f158c4 (patch)
tree5e9e3d5b5097c1a5942180d0bf9c20569c8e3797 /src/pgm_sender.cpp
parent96d85b20982926e60d5065cba3203971c9eeed63 (diff)
more fixes to (e)pgm transport
Diffstat (limited to 'src/pgm_sender.cpp')
-rw-r--r--src/pgm_sender.cpp37
1 files changed, 35 insertions, 2 deletions
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 5c9020d..82a37ab 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -123,8 +123,19 @@ zmq::pgm_sender_t::~pgm_sender_t ()
void zmq::pgm_sender_t::in_event ()
{
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// In event on sender side means NAK or SPMR receiving from some peer.
pgm_socket.process_upstream ();
+ const int last_errno = errno;
+ if (last_errno == ENOMEM || last_errno == EBUSY) {
+ const long timeout = pgm_socket.get_rx_timeout ();
+ add_timer (timeout, rx_timer_id);
+ has_rx_timer = true;
+ }
}
void zmq::pgm_sender_t::out_event ()
@@ -152,14 +163,36 @@ void zmq::pgm_sender_t::out_event ()
put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
}
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// Send the data.
size_t nbytes = pgm_socket.send (out_buffer, write_size);
// We can write either all data or 0 which means rate limit reached.
- if (nbytes == write_size)
+ if (nbytes == write_size) {
write_size = 0;
- else
+ } else {
zmq_assert (nbytes == 0);
+
+ if (errno == ENOMEM) {
+ const long timeout = pgm_socket.get_tx_timeout ();
+ add_timer (timeout, tx_timer_id);
+ has_tx_timer = true;
+ } else
+ zmq_assert (errno == EBUSY);
+ }
+}
+
+void zmq::pgm_sender_t::timer_event (int token)
+{
+ if (token == rx_timer_id)
+ in_event ();
+
+ zmq_assert (token == tx_timer_id);
+ out_event ();
}
#endif