summaryrefslogtreecommitdiff
path: root/src/pgm_socket.cpp
diff options
context:
space:
mode:
authorSteven McCoy <steven.mccoy@miru.hk>2010-09-28 22:46:56 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-09-30 09:11:51 +0200
commitd14be62499478f31cf641399982ecdf4f6f158c4 (patch)
tree5e9e3d5b5097c1a5942180d0bf9c20569c8e3797 /src/pgm_socket.cpp
parent96d85b20982926e60d5065cba3203971c9eeed63 (diff)
more fixes to (e)pgm transport
Diffstat (limited to 'src/pgm_socket.cpp')
-rw-r--r--src/pgm_socket.cpp81
1 files changed, 60 insertions, 21 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 3d55e69..6a6f549 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -417,18 +417,55 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
const int status = pgm_send (sock, data_, data_len_, &nbytes);
- if (nbytes != data_len_) {
- zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK);
- zmq_assert (nbytes == 0);
- }
-
// We have to write all data as one packet.
- if (nbytes > 0)
+ if (nbytes > 0) {
+ zmq_assert (status == PGM_IO_STATUS_NORMAL);
zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_);
+ } else {
+ zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK);
+
+ if (status == PGM_IO_STATUS_RATE_LIMITED)
+ errno = ENOMEM;
+ else
+ errno = EBUSY;
+ }
+
+ // Save return value.
+ last_tx_status = status;
return nbytes;
}
+long zmq::pgm_socket_t::get_rx_timeout ()
+{
+ if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED && last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
+ return -1;
+
+ struct timeval tv;
+ socklen_t optlen = sizeof (tv);
+ const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN : PGM_TIME_REMAIN, &tv, &optlen);
+ zmq_assert (rc);
+
+ const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+
+ return timeout;
+}
+
+long zmq::pgm_socket_t::get_tx_timeout ()
+{
+ if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
+ return -1;
+
+ struct timeval tv;
+ socklen_t optlen = sizeof (tv);
+ const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
+ zmq_assert (rc);
+
+ const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+
+ return timeout;
+}
+
// Return max TSDU size without fragmentation from current PGM transport.
size_t zmq::pgm_socket_t::get_max_tsdu_size ()
{
@@ -457,7 +494,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
nbytes_processed = 0;
pgm_msgv_processed = 0;
errno = EAGAIN;
- return -1;
+ return 0;
}
// If we have are going first time or if we have processed all pgm_msgv_t
@@ -479,23 +516,19 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Invalid parameters
zmq_assert (status != PGM_IO_STATUS_ERROR);
+ last_rx_status = status;
+
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
// pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
if (status == PGM_IO_STATUS_TIMER_PENDING) {
zmq_assert (nbytes_rec == 0);
- struct timeval tv;
- socklen_t optlen = sizeof (tv);
- const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen);
-
- zmq_assert (rc);
-
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec = 0;
errno = EBUSY;
- return -1;
+ return 0;
}
// Send SPMR, NAK, ACK is rate limited.
@@ -503,15 +536,11 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (nbytes_rec == 0);
- struct timeval tv;
- socklen_t optlen = sizeof (tv);
- const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
-
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec = 0;
- errno = EBUSY;
- return -1;
+ errno = ENOMEM;
+ return 0;
}
// No peers and hence no incoming packets.
@@ -523,7 +552,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// returned.
nbytes_rec = 0;
errno = EAGAIN;
- return -1;
+ return 0;
}
// Data loss.
@@ -548,6 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
}
+ // Zero byte payloads are valid in PGM, but not 0MQ protocol.
zmq_assert (nbytes_rec > 0);
// Only one APDU per pgm_msgv_t structure is allowed.
@@ -587,6 +617,15 @@ void zmq::pgm_socket_t::process_upstream ()
// No data should be returned.
zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK));
+
+ last_rx_status = status;
+
+ if (status == PGM_IO_STATUS_TIMER_PENDING)
+ errno = EBUSY;
+ else if (status == PGM_IO_STATUS_RATE_LIMITED)
+ errno = ENOMEM;
+ else
+ errno = EAGAIN;
}
#endif