From 64e68e748607473befbcf2d96590d45dc7bc98db Mon Sep 17 00:00:00 2001 From: malosek Date: Tue, 29 Sep 2009 13:56:19 +0200 Subject: detecting data loss for PGM2 receiver --- src/pgm_socket.cpp | 40 ++++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) (limited to 'src/pgm_socket.cpp') diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index dbfe52d..e8a3b16 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -275,7 +275,11 @@ int zmq::pgm_socket_t::open_transport (void) // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. +#ifdef ZMQ_HAVE_OPENPGM1 + rc = pgm_transport_set_recv_only (g_transport, false); +#elif ZMQ_HAVE_OPENPGM2 rc = pgm_transport_set_recv_only (g_transport, true, false); +#endif if (rc != pgm_ok) { errno = EINVAL; return -1; @@ -710,12 +714,14 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) if (nbytes_rec == -1 && errno == ECONNRESET) { // Save lost data TSI. - *tsi_ = &(g_transport->lost_data_tsi); + *tsi_ = &g_transport->lost_data_tsi; - // In case of dala loss -1 is returned. zmq_log (1, "Data loss detected %s, %s(%i)\n", - pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__); + pgm_print_tsi (&g_transport->lost_data_tsi), __FILE__, __LINE__); + nbytes_rec = 0; + + // In case of dala loss -1 is returned. return -1; } @@ -723,7 +729,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) if (nbytes_rec <= 0) { zmq_log (2, "received %i B, errno %i, %s(%i).\n", (int)nbytes_rec, errno, __FILE__, __LINE__); - errno_assert (nbytes_rec > 0); + errno_assert (false); } #elif defined ZMQ_HAVE_OPENPGM2 GError *pgm_error = NULL; @@ -740,14 +746,36 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // pgm_recvmsg returns ?. if (status == PGM_IO_STATUS_AGAIN || status == PGM_IO_STATUS_AGAIN2) { - + + zmq_assert (nbytes_rec == 0); + // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; return 0; } - // Data loss? + // Data loss. + if (status == PGM_IO_STATUS_RESET) { + + zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n", + status, (int) nbytes_rec, __FILE__, __LINE__); + + pgm_peer_t* peer = (pgm_peer_t*) g_transport->peers_pending->data; + + // Save lost data TSI. + *tsi_ = &peer->tsi; + + zmq_log (1, "Data loss detected %s, %s(%i)\n", pgm_tsi_print (&peer->tsi), + __FILE__, __LINE__); + + nbytes_rec = 0; + + // In case of dala loss -1 is returned. + return -1; + } + + // Catch the rest of the errors. if (status != PGM_IO_STATUS_NORMAL) { zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n", status, (int) nbytes_rec, __FILE__, __LINE__); -- cgit v1.2.3