summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authormalosek <malosek@fastmq.com>2009-09-29 13:56:19 +0200
committermalosek <malosek@fastmq.com>2009-09-29 13:56:19 +0200
commit64e68e748607473befbcf2d96590d45dc7bc98db (patch)
treeb5397a6cbd98c63d6b5b8ffbcf4397327e227b55 /src
parent39d915ded8ccb612ae1f9aaefcd93f349f4c8f4c (diff)
detecting data loss for PGM2 receiver
Diffstat (limited to 'src')
-rw-r--r--src/pgm_receiver.cpp10
-rw-r--r--src/pgm_socket.cpp40
2 files changed, 44 insertions, 6 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 7af6ed5..2ffb9ab 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -160,8 +160,13 @@ void zmq::pgm_receiver_t::in_event ()
peer_info_t peer_info = {false, NULL};
it = peers.insert (std::make_pair (*tsi, peer_info)).first;
+#ifdef ZMQ_HAVE_OPENPGM1
zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_print_tsi (tsi),
__FILE__, __LINE__);
+#elif ZMQ_HAVE_OPENPGM2
+ zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_tsi_print (tsi),
+ __FILE__, __LINE__);
+#endif
}
// There is not beginning of the message in current APDU and we
@@ -187,8 +192,13 @@ void zmq::pgm_receiver_t::in_event ()
it->second.decoder = new zmq_decoder_t;
it->second.decoder->set_inout (inout);
+#ifdef ZMQ_HAVE_OPENPGM1
zmq_log (1, "Peer %s joined into the stream, %s(%i)\n",
pgm_print_tsi (tsi), __FILE__, __LINE__);
+#elif ZMQ_HAVE_OPENPGM2
+ zmq_log (1, "Peer %s joined into the stream, %s(%i)\n",
+ pgm_tsi_print (tsi), __FILE__, __LINE__);
+#endif
}
if (nbytes > 0) {
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index dbfe52d..e8a3b16 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -275,7 +275,11 @@ int zmq::pgm_socket_t::open_transport (void)
// Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport.
+#ifdef ZMQ_HAVE_OPENPGM1
+ rc = pgm_transport_set_recv_only (g_transport, false);
+#elif ZMQ_HAVE_OPENPGM2
rc = pgm_transport_set_recv_only (g_transport, true, false);
+#endif
if (rc != pgm_ok) {
errno = EINVAL;
return -1;
@@ -710,12 +714,14 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
if (nbytes_rec == -1 && errno == ECONNRESET) {
// Save lost data TSI.
- *tsi_ = &(g_transport->lost_data_tsi);
+ *tsi_ = &g_transport->lost_data_tsi;
- // In case of dala loss -1 is returned.
zmq_log (1, "Data loss detected %s, %s(%i)\n",
- pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__);
+ pgm_print_tsi (&g_transport->lost_data_tsi), __FILE__, __LINE__);
+
nbytes_rec = 0;
+
+ // In case of dala loss -1 is returned.
return -1;
}
@@ -723,7 +729,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
if (nbytes_rec <= 0) {
zmq_log (2, "received %i B, errno %i, %s(%i).\n", (int)nbytes_rec,
errno, __FILE__, __LINE__);
- errno_assert (nbytes_rec > 0);
+ errno_assert (false);
}
#elif defined ZMQ_HAVE_OPENPGM2
GError *pgm_error = NULL;
@@ -740,14 +746,36 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// pgm_recvmsg returns ?.
if (status == PGM_IO_STATUS_AGAIN ||
status == PGM_IO_STATUS_AGAIN2) {
-
+
+ zmq_assert (nbytes_rec == 0);
+
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec = 0;
return 0;
}
- // Data loss?
+ // Data loss.
+ if (status == PGM_IO_STATUS_RESET) {
+
+ zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",
+ status, (int) nbytes_rec, __FILE__, __LINE__);
+
+ pgm_peer_t* peer = (pgm_peer_t*) g_transport->peers_pending->data;
+
+ // Save lost data TSI.
+ *tsi_ = &peer->tsi;
+
+ zmq_log (1, "Data loss detected %s, %s(%i)\n", pgm_tsi_print (&peer->tsi),
+ __FILE__, __LINE__);
+
+ nbytes_rec = 0;
+
+ // In case of dala loss -1 is returned.
+ return -1;
+ }
+
+ // Catch the rest of the errors.
if (status != PGM_IO_STATUS_NORMAL) {
zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",
status, (int) nbytes_rec, __FILE__, __LINE__);