diff options
| -rw-r--r-- | src/pgm_receiver.cpp | 10 | ||||
| -rw-r--r-- | src/pgm_socket.cpp | 40 | 
2 files changed, 44 insertions, 6 deletions
| diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 7af6ed5..2ffb9ab 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -160,8 +160,13 @@ void zmq::pgm_receiver_t::in_event ()              peer_info_t peer_info = {false, NULL};              it = peers.insert (std::make_pair (*tsi, peer_info)).first; +#ifdef ZMQ_HAVE_OPENPGM1              zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_print_tsi (tsi),                  __FILE__, __LINE__); +#elif ZMQ_HAVE_OPENPGM2 +            zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_tsi_print (tsi), +                __FILE__, __LINE__); +#endif          }          //  There is not beginning of the message in current APDU and we @@ -187,8 +192,13 @@ void zmq::pgm_receiver_t::in_event ()              it->second.decoder = new zmq_decoder_t;              it->second.decoder->set_inout (inout); +#ifdef ZMQ_HAVE_OPENPGM1              zmq_log (1, "Peer %s joined into the stream, %s(%i)\n",                   pgm_print_tsi (tsi), __FILE__, __LINE__); +#elif ZMQ_HAVE_OPENPGM2 +             zmq_log (1, "Peer %s joined into the stream, %s(%i)\n",  +                pgm_tsi_print (tsi), __FILE__, __LINE__); +#endif          }          if (nbytes > 0) { diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index dbfe52d..e8a3b16 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -275,7 +275,11 @@ int zmq::pgm_socket_t::open_transport (void)          //  Set transport->can_send_data = FALSE.          //  Note that NAKs are still generated by the transport. +#ifdef ZMQ_HAVE_OPENPGM1 +        rc = pgm_transport_set_recv_only (g_transport, false); +#elif ZMQ_HAVE_OPENPGM2          rc = pgm_transport_set_recv_only (g_transport, true, false); +#endif          if (rc != pgm_ok) {              errno = EINVAL;              return -1; @@ -710,12 +714,14 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)          if (nbytes_rec == -1 && errno == ECONNRESET) {              //  Save lost data TSI. -            *tsi_ = &(g_transport->lost_data_tsi); +            *tsi_ = &g_transport->lost_data_tsi; -            //  In case of dala loss -1 is returned.              zmq_log (1, "Data loss detected %s, %s(%i)\n",  -                pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__); +                pgm_print_tsi (&g_transport->lost_data_tsi), __FILE__, __LINE__); +              nbytes_rec = 0; + +            //  In case of dala loss -1 is returned.              return -1;          } @@ -723,7 +729,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)          if (nbytes_rec <= 0) {              zmq_log (2, "received %i B, errno %i, %s(%i).\n", (int)nbytes_rec,                   errno, __FILE__, __LINE__); -            errno_assert (nbytes_rec > 0); +            errno_assert (false);          }  #elif defined ZMQ_HAVE_OPENPGM2          GError *pgm_error = NULL; @@ -740,14 +746,36 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)          //  pgm_recvmsg returns ?.          if (status == PGM_IO_STATUS_AGAIN ||                status == PGM_IO_STATUS_AGAIN2) { -         +        +            zmq_assert (nbytes_rec == 0); +              //  In case if no RDATA/ODATA caused POLLIN 0 is               //  returned.              nbytes_rec = 0;              return 0;          } -        //  Data loss?  +        //  Data loss. +        if (status == PGM_IO_STATUS_RESET) { + +            zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",  +                status, (int) nbytes_rec, __FILE__, __LINE__); + +            pgm_peer_t* peer = (pgm_peer_t*) g_transport->peers_pending->data; + +            //  Save lost data TSI. +            *tsi_ = &peer->tsi; + +            zmq_log (1, "Data loss detected %s, %s(%i)\n", pgm_tsi_print (&peer->tsi), +                __FILE__, __LINE__); + +            nbytes_rec = 0; + +            //  In case of dala loss -1 is returned. +            return -1; +        } + +        //  Catch the rest of the errors.           if (status != PGM_IO_STATUS_NORMAL) {              zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",                   status, (int) nbytes_rec, __FILE__, __LINE__); | 
