summaryrefslogtreecommitdiff
path: root/src/pgm_receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pgm_receiver.cpp')
-rw-r--r--src/pgm_receiver.cpp35
1 files changed, 30 insertions, 5 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 048c529..01b95b2 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -55,7 +55,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_);
}
-void zmq::pgm_receiver_t::plug (i_inout *inout_)
+void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
// Retrieve PGM fds and start polling.
int socket_fd;
@@ -88,12 +88,18 @@ void zmq::pgm_receiver_t::unplug ()
inout = NULL;
}
-void zmq::pgm_receiver_t::revive ()
+void zmq::pgm_receiver_t::terminate ()
+{
+ unplug ();
+ delete this;
+}
+
+void zmq::pgm_receiver_t::activate_out ()
{
zmq_assert (false);
}
-void zmq::pgm_receiver_t::resume_input ()
+void zmq::pgm_receiver_t::activate_in ()
{
// It is possible that the most recently used decoder
// processed the whole buffer but failed to write
@@ -129,6 +135,11 @@ void zmq::pgm_receiver_t::in_event ()
zmq_assert (pending_bytes == 0);
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// TODO: This loop can effectively block other engines in the same I/O
// thread in the case of high load.
while (true) {
@@ -138,8 +149,15 @@ void zmq::pgm_receiver_t::in_event ()
// No data to process. This may happen if the packet received is
// neither ODATA nor ODATA.
- if (received == 0)
+ if (received == 0) {
+ const int last_errno = errno;
+ if (last_errno == ENOMEM || last_errno == EBUSY) {
+ const long timeout = pgm_socket.get_rx_timeout ();
+ add_timer (timeout, rx_timer_id);
+ has_rx_timer = true;
+ }
break;
+ }
// Find the peer based on its TSI.
peers_t::iterator it = peers.find (*tsi);
@@ -189,7 +207,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true;
// Create and connect decoder for the peer.
- it->second.decoder = new (std::nothrow) zmq_decoder_t (0);
+ it->second.decoder = new (std::nothrow) decoder_t (0);
it->second.decoder->set_inout (inout);
}
@@ -213,5 +231,12 @@ void zmq::pgm_receiver_t::in_event ()
inout->flush ();
}
+void zmq::pgm_receiver_t::timer_event (int token)
+{
+ zmq_assert (token == rx_timer_id);
+
+ in_event ();
+}
+
#endif