diff options
Diffstat (limited to 'src/pgm_sender.cpp')
-rw-r--r-- | src/pgm_sender.cpp | 49 |
1 files changed, 44 insertions, 5 deletions
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 9aeb7a9..82a37ab 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -58,7 +58,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) return rc; } -void zmq::pgm_sender_t::plug (i_inout *inout_) +void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_) { // Alocate 2 fds for PGM socket. int downlink_socket_fd = 0; @@ -96,13 +96,19 @@ void zmq::pgm_sender_t::unplug () encoder.set_inout (NULL); } -void zmq::pgm_sender_t::revive () +void zmq::pgm_sender_t::terminate () +{ + unplug (); + delete this; +} + +void zmq::pgm_sender_t::activate_out () { set_pollout (handle); out_event (); } -void zmq::pgm_sender_t::resume_input () +void zmq::pgm_sender_t::activate_in () { zmq_assert (false); } @@ -117,8 +123,19 @@ zmq::pgm_sender_t::~pgm_sender_t () void zmq::pgm_sender_t::in_event () { + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + // In event on sender side means NAK or SPMR receiving from some peer. pgm_socket.process_upstream (); + const int last_errno = errno; + if (last_errno == ENOMEM || last_errno == EBUSY) { + const long timeout = pgm_socket.get_rx_timeout (); + add_timer (timeout, rx_timer_id); + has_rx_timer = true; + } } void zmq::pgm_sender_t::out_event () @@ -146,14 +163,36 @@ void zmq::pgm_sender_t::out_event () put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); } + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + // Send the data. size_t nbytes = pgm_socket.send (out_buffer, write_size); // We can write either all data or 0 which means rate limit reached. - if (nbytes == write_size) + if (nbytes == write_size) { write_size = 0; - else + } else { zmq_assert (nbytes == 0); + + if (errno == ENOMEM) { + const long timeout = pgm_socket.get_tx_timeout (); + add_timer (timeout, tx_timer_id); + has_tx_timer = true; + } else + zmq_assert (errno == EBUSY); + } +} + +void zmq::pgm_sender_t::timer_event (int token) +{ + if (token == rx_timer_id) + in_event (); + + zmq_assert (token == tx_timer_id); + out_event (); } #endif |