summaryrefslogtreecommitdiff
path: root/src/pgm_sender.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pgm_sender.cpp')
-rw-r--r--src/pgm_sender.cpp49
1 files changed, 44 insertions, 5 deletions
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 9aeb7a9..82a37ab 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -58,7 +58,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
return rc;
}
-void zmq::pgm_sender_t::plug (i_inout *inout_)
+void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
// Alocate 2 fds for PGM socket.
int downlink_socket_fd = 0;
@@ -96,13 +96,19 @@ void zmq::pgm_sender_t::unplug ()
encoder.set_inout (NULL);
}
-void zmq::pgm_sender_t::revive ()
+void zmq::pgm_sender_t::terminate ()
+{
+ unplug ();
+ delete this;
+}
+
+void zmq::pgm_sender_t::activate_out ()
{
set_pollout (handle);
out_event ();
}
-void zmq::pgm_sender_t::resume_input ()
+void zmq::pgm_sender_t::activate_in ()
{
zmq_assert (false);
}
@@ -117,8 +123,19 @@ zmq::pgm_sender_t::~pgm_sender_t ()
void zmq::pgm_sender_t::in_event ()
{
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// In event on sender side means NAK or SPMR receiving from some peer.
pgm_socket.process_upstream ();
+ const int last_errno = errno;
+ if (last_errno == ENOMEM || last_errno == EBUSY) {
+ const long timeout = pgm_socket.get_rx_timeout ();
+ add_timer (timeout, rx_timer_id);
+ has_rx_timer = true;
+ }
}
void zmq::pgm_sender_t::out_event ()
@@ -146,14 +163,36 @@ void zmq::pgm_sender_t::out_event ()
put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
}
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// Send the data.
size_t nbytes = pgm_socket.send (out_buffer, write_size);
// We can write either all data or 0 which means rate limit reached.
- if (nbytes == write_size)
+ if (nbytes == write_size) {
write_size = 0;
- else
+ } else {
zmq_assert (nbytes == 0);
+
+ if (errno == ENOMEM) {
+ const long timeout = pgm_socket.get_tx_timeout ();
+ add_timer (timeout, tx_timer_id);
+ has_tx_timer = true;
+ } else
+ zmq_assert (errno == EBUSY);
+ }
+}
+
+void zmq::pgm_sender_t::timer_event (int token)
+{
+ if (token == rx_timer_id)
+ in_event ();
+
+ zmq_assert (token == tx_timer_id);
+ out_event ();
}
#endif