diff options
author | malosek <malosek@fastmq.com> | 2009-09-29 13:56:19 +0200 |
---|---|---|
committer | malosek <malosek@fastmq.com> | 2009-09-29 13:56:19 +0200 |
commit | 64e68e748607473befbcf2d96590d45dc7bc98db (patch) | |
tree | b5397a6cbd98c63d6b5b8ffbcf4397327e227b55 /src | |
parent | 39d915ded8ccb612ae1f9aaefcd93f349f4c8f4c (diff) |
detecting data loss for PGM2 receiver
Diffstat (limited to 'src')
-rw-r--r-- | src/pgm_receiver.cpp | 10 | ||||
-rw-r--r-- | src/pgm_socket.cpp | 40 |
2 files changed, 44 insertions, 6 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 7af6ed5..2ffb9ab 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -160,8 +160,13 @@ void zmq::pgm_receiver_t::in_event () peer_info_t peer_info = {false, NULL}; it = peers.insert (std::make_pair (*tsi, peer_info)).first; +#ifdef ZMQ_HAVE_OPENPGM1 zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_print_tsi (tsi), __FILE__, __LINE__); +#elif ZMQ_HAVE_OPENPGM2 + zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_tsi_print (tsi), + __FILE__, __LINE__); +#endif } // There is not beginning of the message in current APDU and we @@ -187,8 +192,13 @@ void zmq::pgm_receiver_t::in_event () it->second.decoder = new zmq_decoder_t; it->second.decoder->set_inout (inout); +#ifdef ZMQ_HAVE_OPENPGM1 zmq_log (1, "Peer %s joined into the stream, %s(%i)\n", pgm_print_tsi (tsi), __FILE__, __LINE__); +#elif ZMQ_HAVE_OPENPGM2 + zmq_log (1, "Peer %s joined into the stream, %s(%i)\n", + pgm_tsi_print (tsi), __FILE__, __LINE__); +#endif } if (nbytes > 0) { diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index dbfe52d..e8a3b16 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -275,7 +275,11 @@ int zmq::pgm_socket_t::open_transport (void) // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. +#ifdef ZMQ_HAVE_OPENPGM1 + rc = pgm_transport_set_recv_only (g_transport, false); +#elif ZMQ_HAVE_OPENPGM2 rc = pgm_transport_set_recv_only (g_transport, true, false); +#endif if (rc != pgm_ok) { errno = EINVAL; return -1; @@ -710,12 +714,14 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) if (nbytes_rec == -1 && errno == ECONNRESET) { // Save lost data TSI. - *tsi_ = &(g_transport->lost_data_tsi); + *tsi_ = &g_transport->lost_data_tsi; - // In case of dala loss -1 is returned. zmq_log (1, "Data loss detected %s, %s(%i)\n", - pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__); + pgm_print_tsi (&g_transport->lost_data_tsi), __FILE__, __LINE__); + nbytes_rec = 0; + + // In case of dala loss -1 is returned. return -1; } @@ -723,7 +729,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) if (nbytes_rec <= 0) { zmq_log (2, "received %i B, errno %i, %s(%i).\n", (int)nbytes_rec, errno, __FILE__, __LINE__); - errno_assert (nbytes_rec > 0); + errno_assert (false); } #elif defined ZMQ_HAVE_OPENPGM2 GError *pgm_error = NULL; @@ -740,14 +746,36 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // pgm_recvmsg returns ?. if (status == PGM_IO_STATUS_AGAIN || status == PGM_IO_STATUS_AGAIN2) { - + + zmq_assert (nbytes_rec == 0); + // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; return 0; } - // Data loss? + // Data loss. + if (status == PGM_IO_STATUS_RESET) { + + zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n", + status, (int) nbytes_rec, __FILE__, __LINE__); + + pgm_peer_t* peer = (pgm_peer_t*) g_transport->peers_pending->data; + + // Save lost data TSI. + *tsi_ = &peer->tsi; + + zmq_log (1, "Data loss detected %s, %s(%i)\n", pgm_tsi_print (&peer->tsi), + __FILE__, __LINE__); + + nbytes_rec = 0; + + // In case of dala loss -1 is returned. + return -1; + } + + // Catch the rest of the errors. if (status != PGM_IO_STATUS_NORMAL) { zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n", status, (int) nbytes_rec, __FILE__, __LINE__); |