diff options
Diffstat (limited to 'src/pgm_receiver.cpp')
-rw-r--r-- | src/pgm_receiver.cpp | 35 |
1 files changed, 30 insertions, 5 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 048c529..01b95b2 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -55,7 +55,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) return pgm_socket.init (udp_encapsulation_, network_); } -void zmq::pgm_receiver_t::plug (i_inout *inout_) +void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_) { // Retrieve PGM fds and start polling. int socket_fd; @@ -88,12 +88,18 @@ void zmq::pgm_receiver_t::unplug () inout = NULL; } -void zmq::pgm_receiver_t::revive () +void zmq::pgm_receiver_t::terminate () +{ + unplug (); + delete this; +} + +void zmq::pgm_receiver_t::activate_out () { zmq_assert (false); } -void zmq::pgm_receiver_t::resume_input () +void zmq::pgm_receiver_t::activate_in () { // It is possible that the most recently used decoder // processed the whole buffer but failed to write @@ -129,6 +135,11 @@ void zmq::pgm_receiver_t::in_event () zmq_assert (pending_bytes == 0); + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + // TODO: This loop can effectively block other engines in the same I/O // thread in the case of high load. while (true) { @@ -138,8 +149,15 @@ void zmq::pgm_receiver_t::in_event () // No data to process. This may happen if the packet received is // neither ODATA nor ODATA. - if (received == 0) + if (received == 0) { + const int last_errno = errno; + if (last_errno == ENOMEM || last_errno == EBUSY) { + const long timeout = pgm_socket.get_rx_timeout (); + add_timer (timeout, rx_timer_id); + has_rx_timer = true; + } break; + } // Find the peer based on its TSI. peers_t::iterator it = peers.find (*tsi); @@ -189,7 +207,7 @@ void zmq::pgm_receiver_t::in_event () it->second.joined = true; // Create and connect decoder for the peer. - it->second.decoder = new (std::nothrow) zmq_decoder_t (0); + it->second.decoder = new (std::nothrow) decoder_t (0); it->second.decoder->set_inout (inout); } @@ -213,5 +231,12 @@ void zmq::pgm_receiver_t::in_event () inout->flush (); } +void zmq::pgm_receiver_t::timer_event (int token) +{ + zmq_assert (token == rx_timer_id); + + in_event (); +} + #endif |