summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/pgm_receiver.cpp21
-rw-r--r--src/pgm_receiver.hpp7
-rw-r--r--src/pgm_sender.cpp37
-rw-r--r--src/pgm_sender.hpp7
-rw-r--r--src/pgm_socket.cpp81
-rw-r--r--src/pgm_socket.hpp5
6 files changed, 134 insertions, 24 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index ceae0da..01b95b2 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -135,6 +135,11 @@ void zmq::pgm_receiver_t::in_event ()
zmq_assert (pending_bytes == 0);
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// TODO: This loop can effectively block other engines in the same I/O
// thread in the case of high load.
while (true) {
@@ -144,8 +149,15 @@ void zmq::pgm_receiver_t::in_event ()
// No data to process. This may happen if the packet received is
// neither ODATA nor ODATA.
- if (received < 0)
+ if (received == 0) {
+ const int last_errno = errno;
+ if (last_errno == ENOMEM || last_errno == EBUSY) {
+ const long timeout = pgm_socket.get_rx_timeout ();
+ add_timer (timeout, rx_timer_id);
+ has_rx_timer = true;
+ }
break;
+ }
// Find the peer based on its TSI.
peers_t::iterator it = peers.find (*tsi);
@@ -219,5 +231,12 @@ void zmq::pgm_receiver_t::in_event ()
inout->flush ();
}
+void zmq::pgm_receiver_t::timer_event (int token)
+{
+ zmq_assert (token == rx_timer_id);
+
+ in_event ();
+}
+
#endif
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index f32d37e..627a658 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -59,9 +59,16 @@ namespace zmq
// i_poll_events interface implementation.
void in_event ();
+ void timer_event (int token);
private:
+ // RX timeout timer ID.
+ enum {rx_timer_id = 0xa1};
+
+ // RX timer is running.
+ bool has_rx_timer;
+
// If joined is true we are already getting messages from the peer.
// It it's false, we are getting data but still we haven't seen
// beginning of a message.
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 5c9020d..82a37ab 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -123,8 +123,19 @@ zmq::pgm_sender_t::~pgm_sender_t ()
void zmq::pgm_sender_t::in_event ()
{
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// In event on sender side means NAK or SPMR receiving from some peer.
pgm_socket.process_upstream ();
+ const int last_errno = errno;
+ if (last_errno == ENOMEM || last_errno == EBUSY) {
+ const long timeout = pgm_socket.get_rx_timeout ();
+ add_timer (timeout, rx_timer_id);
+ has_rx_timer = true;
+ }
}
void zmq::pgm_sender_t::out_event ()
@@ -152,14 +163,36 @@ void zmq::pgm_sender_t::out_event ()
put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
}
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// Send the data.
size_t nbytes = pgm_socket.send (out_buffer, write_size);
// We can write either all data or 0 which means rate limit reached.
- if (nbytes == write_size)
+ if (nbytes == write_size) {
write_size = 0;
- else
+ } else {
zmq_assert (nbytes == 0);
+
+ if (errno == ENOMEM) {
+ const long timeout = pgm_socket.get_tx_timeout ();
+ add_timer (timeout, tx_timer_id);
+ has_tx_timer = true;
+ } else
+ zmq_assert (errno == EBUSY);
+ }
+}
+
+void zmq::pgm_sender_t::timer_event (int token)
+{
+ if (token == rx_timer_id)
+ in_event ();
+
+ zmq_assert (token == tx_timer_id);
+ out_event ();
}
#endif
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index bee416c..6d3ae31 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -58,9 +58,16 @@ namespace zmq
// i_poll_events interface implementation.
void in_event ();
void out_event ();
+ void timer_event (int token);
private:
+ // TX and RX timeout timer ID's.
+ enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1};
+
+ // Timers are running.
+ bool has_tx_timer, has_rx_timer;
+
// Message encoder.
encoder_t encoder;
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 3d55e69..6a6f549 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -417,18 +417,55 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
const int status = pgm_send (sock, data_, data_len_, &nbytes);
- if (nbytes != data_len_) {
- zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK);
- zmq_assert (nbytes == 0);
- }
-
// We have to write all data as one packet.
- if (nbytes > 0)
+ if (nbytes > 0) {
+ zmq_assert (status == PGM_IO_STATUS_NORMAL);
zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_);
+ } else {
+ zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK);
+
+ if (status == PGM_IO_STATUS_RATE_LIMITED)
+ errno = ENOMEM;
+ else
+ errno = EBUSY;
+ }
+
+ // Save return value.
+ last_tx_status = status;
return nbytes;
}
+long zmq::pgm_socket_t::get_rx_timeout ()
+{
+ if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED && last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
+ return -1;
+
+ struct timeval tv;
+ socklen_t optlen = sizeof (tv);
+ const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN : PGM_TIME_REMAIN, &tv, &optlen);
+ zmq_assert (rc);
+
+ const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+
+ return timeout;
+}
+
+long zmq::pgm_socket_t::get_tx_timeout ()
+{
+ if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
+ return -1;
+
+ struct timeval tv;
+ socklen_t optlen = sizeof (tv);
+ const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
+ zmq_assert (rc);
+
+ const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+
+ return timeout;
+}
+
// Return max TSDU size without fragmentation from current PGM transport.
size_t zmq::pgm_socket_t::get_max_tsdu_size ()
{
@@ -457,7 +494,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
nbytes_processed = 0;
pgm_msgv_processed = 0;
errno = EAGAIN;
- return -1;
+ return 0;
}
// If we have are going first time or if we have processed all pgm_msgv_t
@@ -479,23 +516,19 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Invalid parameters
zmq_assert (status != PGM_IO_STATUS_ERROR);
+ last_rx_status = status;
+
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
// pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
if (status == PGM_IO_STATUS_TIMER_PENDING) {
zmq_assert (nbytes_rec == 0);
- struct timeval tv;
- socklen_t optlen = sizeof (tv);
- const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen);
-
- zmq_assert (rc);
-
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec = 0;
errno = EBUSY;
- return -1;
+ return 0;
}
// Send SPMR, NAK, ACK is rate limited.
@@ -503,15 +536,11 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (nbytes_rec == 0);
- struct timeval tv;
- socklen_t optlen = sizeof (tv);
- const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
-
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec = 0;
- errno = EBUSY;
- return -1;
+ errno = ENOMEM;
+ return 0;
}
// No peers and hence no incoming packets.
@@ -523,7 +552,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// returned.
nbytes_rec = 0;
errno = EAGAIN;
- return -1;
+ return 0;
}
// Data loss.
@@ -548,6 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
}
+ // Zero byte payloads are valid in PGM, but not 0MQ protocol.
zmq_assert (nbytes_rec > 0);
// Only one APDU per pgm_msgv_t structure is allowed.
@@ -587,6 +617,15 @@ void zmq::pgm_socket_t::process_upstream ()
// No data should be returned.
zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK));
+
+ last_rx_status = status;
+
+ if (status == PGM_IO_STATUS_TIMER_PENDING)
+ errno = EBUSY;
+ else if (status == PGM_IO_STATUS_RATE_LIMITED)
+ errno = ENOMEM;
+ else
+ errno = EAGAIN;
}
#endif
diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp
index 334e42f..142fdcc 100644
--- a/src/pgm_socket.hpp
+++ b/src/pgm_socket.hpp
@@ -67,6 +67,9 @@ namespace zmq
// Receive data from pgm socket.
ssize_t receive (void **data_, const pgm_tsi_t **tsi_);
+ long get_rx_timeout ();
+ long get_tx_timeout ();
+
// POLLIN on sender side should mean NAK or SPMR receiving.
// process_upstream function is used to handle such a situation.
void process_upstream ();
@@ -76,6 +79,8 @@ namespace zmq
// OpenPGM transport
pgm_sock_t* sock;
+ int last_rx_status, last_tx_status;
+
// Associated socket options.
options_t options;