summaryrefslogtreecommitdiff
path: root/src/pgm_socket.cpp
diff options
context:
space:
mode:
authormalosek <malosek@fastmq.com>2009-09-29 13:56:19 +0200
committermalosek <malosek@fastmq.com>2009-09-29 13:56:19 +0200
commit64e68e748607473befbcf2d96590d45dc7bc98db (patch)
treeb5397a6cbd98c63d6b5b8ffbcf4397327e227b55 /src/pgm_socket.cpp
parent39d915ded8ccb612ae1f9aaefcd93f349f4c8f4c (diff)
detecting data loss for PGM2 receiver
Diffstat (limited to 'src/pgm_socket.cpp')
-rw-r--r--src/pgm_socket.cpp40
1 files changed, 34 insertions, 6 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index dbfe52d..e8a3b16 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -275,7 +275,11 @@ int zmq::pgm_socket_t::open_transport (void)
// Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport.
+#ifdef ZMQ_HAVE_OPENPGM1
+ rc = pgm_transport_set_recv_only (g_transport, false);
+#elif ZMQ_HAVE_OPENPGM2
rc = pgm_transport_set_recv_only (g_transport, true, false);
+#endif
if (rc != pgm_ok) {
errno = EINVAL;
return -1;
@@ -710,12 +714,14 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
if (nbytes_rec == -1 && errno == ECONNRESET) {
// Save lost data TSI.
- *tsi_ = &(g_transport->lost_data_tsi);
+ *tsi_ = &g_transport->lost_data_tsi;
- // In case of dala loss -1 is returned.
zmq_log (1, "Data loss detected %s, %s(%i)\n",
- pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__);
+ pgm_print_tsi (&g_transport->lost_data_tsi), __FILE__, __LINE__);
+
nbytes_rec = 0;
+
+ // In case of dala loss -1 is returned.
return -1;
}
@@ -723,7 +729,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
if (nbytes_rec <= 0) {
zmq_log (2, "received %i B, errno %i, %s(%i).\n", (int)nbytes_rec,
errno, __FILE__, __LINE__);
- errno_assert (nbytes_rec > 0);
+ errno_assert (false);
}
#elif defined ZMQ_HAVE_OPENPGM2
GError *pgm_error = NULL;
@@ -740,14 +746,36 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// pgm_recvmsg returns ?.
if (status == PGM_IO_STATUS_AGAIN ||
status == PGM_IO_STATUS_AGAIN2) {
-
+
+ zmq_assert (nbytes_rec == 0);
+
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec = 0;
return 0;
}
- // Data loss?
+ // Data loss.
+ if (status == PGM_IO_STATUS_RESET) {
+
+ zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",
+ status, (int) nbytes_rec, __FILE__, __LINE__);
+
+ pgm_peer_t* peer = (pgm_peer_t*) g_transport->peers_pending->data;
+
+ // Save lost data TSI.
+ *tsi_ = &peer->tsi;
+
+ zmq_log (1, "Data loss detected %s, %s(%i)\n", pgm_tsi_print (&peer->tsi),
+ __FILE__, __LINE__);
+
+ nbytes_rec = 0;
+
+ // In case of dala loss -1 is returned.
+ return -1;
+ }
+
+ // Catch the rest of the errors.
if (status != PGM_IO_STATUS_NORMAL) {
zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",
status, (int) nbytes_rec, __FILE__, __LINE__);