From d14be62499478f31cf641399982ecdf4f6f158c4 Mon Sep 17 00:00:00 2001 From: Steven McCoy Date: Tue, 28 Sep 2010 22:46:56 +0200 Subject: more fixes to (e)pgm transport --- src/pgm_socket.cpp | 81 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 21 deletions(-) (limited to 'src/pgm_socket.cpp') diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 3d55e69..6a6f549 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -417,18 +417,55 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) const int status = pgm_send (sock, data_, data_len_, &nbytes); - if (nbytes != data_len_) { - zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK); - zmq_assert (nbytes == 0); - } - // We have to write all data as one packet. - if (nbytes > 0) + if (nbytes > 0) { + zmq_assert (status == PGM_IO_STATUS_NORMAL); zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_); + } else { + zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK); + + if (status == PGM_IO_STATUS_RATE_LIMITED) + errno = ENOMEM; + else + errno = EBUSY; + } + + // Save return value. + last_tx_status = status; return nbytes; } +long zmq::pgm_socket_t::get_rx_timeout () +{ + if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED && last_rx_status != PGM_IO_STATUS_TIMER_PENDING) + return -1; + + struct timeval tv; + socklen_t optlen = sizeof (tv); + const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN : PGM_TIME_REMAIN, &tv, &optlen); + zmq_assert (rc); + + const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + + return timeout; +} + +long zmq::pgm_socket_t::get_tx_timeout () +{ + if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED) + return -1; + + struct timeval tv; + socklen_t optlen = sizeof (tv); + const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen); + zmq_assert (rc); + + const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + + return timeout; +} + // Return max TSDU size without fragmentation from current PGM transport. size_t zmq::pgm_socket_t::get_max_tsdu_size () { @@ -457,7 +494,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) nbytes_processed = 0; pgm_msgv_processed = 0; errno = EAGAIN; - return -1; + return 0; } // If we have are going first time or if we have processed all pgm_msgv_t @@ -479,23 +516,19 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Invalid parameters zmq_assert (status != PGM_IO_STATUS_ERROR); + last_rx_status = status; + // In a case when no ODATA/RDATA fired POLLIN event (SPM...) // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING. if (status == PGM_IO_STATUS_TIMER_PENDING) { zmq_assert (nbytes_rec == 0); - struct timeval tv; - socklen_t optlen = sizeof (tv); - const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen); - - zmq_assert (rc); - // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; errno = EBUSY; - return -1; + return 0; } // Send SPMR, NAK, ACK is rate limited. @@ -503,15 +536,11 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_assert (nbytes_rec == 0); - struct timeval tv; - socklen_t optlen = sizeof (tv); - const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen); - // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; - errno = EBUSY; - return -1; + errno = ENOMEM; + return 0; } // No peers and hence no incoming packets. @@ -523,7 +552,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // returned. nbytes_rec = 0; errno = EAGAIN; - return -1; + return 0; } // Data loss. @@ -548,6 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_assert (pgm_msgv_processed <= pgm_msgv_len); } + // Zero byte payloads are valid in PGM, but not 0MQ protocol. zmq_assert (nbytes_rec > 0); // Only one APDU per pgm_msgv_t structure is allowed. @@ -587,6 +617,15 @@ void zmq::pgm_socket_t::process_upstream () // No data should be returned. zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK)); + + last_rx_status = status; + + if (status == PGM_IO_STATUS_TIMER_PENDING) + errno = EBUSY; + else if (status == PGM_IO_STATUS_RATE_LIMITED) + errno = ENOMEM; + else + errno = EAGAIN; } #endif -- cgit v1.2.3