From d14be62499478f31cf641399982ecdf4f6f158c4 Mon Sep 17 00:00:00 2001 From: Steven McCoy Date: Tue, 28 Sep 2010 22:46:56 +0200 Subject: more fixes to (e)pgm transport --- src/pgm_receiver.cpp | 21 +++++++++++++- src/pgm_receiver.hpp | 7 +++++ src/pgm_sender.cpp | 37 ++++++++++++++++++++++-- src/pgm_sender.hpp | 7 +++++ src/pgm_socket.cpp | 81 ++++++++++++++++++++++++++++++++++++++-------------- src/pgm_socket.hpp | 5 ++++ 6 files changed, 134 insertions(+), 24 deletions(-) diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index ceae0da..01b95b2 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -135,6 +135,11 @@ void zmq::pgm_receiver_t::in_event () zmq_assert (pending_bytes == 0); + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + // TODO: This loop can effectively block other engines in the same I/O // thread in the case of high load. while (true) { @@ -144,8 +149,15 @@ void zmq::pgm_receiver_t::in_event () // No data to process. This may happen if the packet received is // neither ODATA nor ODATA. - if (received < 0) + if (received == 0) { + const int last_errno = errno; + if (last_errno == ENOMEM || last_errno == EBUSY) { + const long timeout = pgm_socket.get_rx_timeout (); + add_timer (timeout, rx_timer_id); + has_rx_timer = true; + } break; + } // Find the peer based on its TSI. peers_t::iterator it = peers.find (*tsi); @@ -219,5 +231,12 @@ void zmq::pgm_receiver_t::in_event () inout->flush (); } +void zmq::pgm_receiver_t::timer_event (int token) +{ + zmq_assert (token == rx_timer_id); + + in_event (); +} + #endif diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index f32d37e..627a658 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -59,9 +59,16 @@ namespace zmq // i_poll_events interface implementation. void in_event (); + void timer_event (int token); private: + // RX timeout timer ID. + enum {rx_timer_id = 0xa1}; + + // RX timer is running. + bool has_rx_timer; + // If joined is true we are already getting messages from the peer. // It it's false, we are getting data but still we haven't seen // beginning of a message. diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 5c9020d..82a37ab 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -123,8 +123,19 @@ zmq::pgm_sender_t::~pgm_sender_t () void zmq::pgm_sender_t::in_event () { + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + // In event on sender side means NAK or SPMR receiving from some peer. pgm_socket.process_upstream (); + const int last_errno = errno; + if (last_errno == ENOMEM || last_errno == EBUSY) { + const long timeout = pgm_socket.get_rx_timeout (); + add_timer (timeout, rx_timer_id); + has_rx_timer = true; + } } void zmq::pgm_sender_t::out_event () @@ -152,14 +163,36 @@ void zmq::pgm_sender_t::out_event () put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); } + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + // Send the data. size_t nbytes = pgm_socket.send (out_buffer, write_size); // We can write either all data or 0 which means rate limit reached. - if (nbytes == write_size) + if (nbytes == write_size) { write_size = 0; - else + } else { zmq_assert (nbytes == 0); + + if (errno == ENOMEM) { + const long timeout = pgm_socket.get_tx_timeout (); + add_timer (timeout, tx_timer_id); + has_tx_timer = true; + } else + zmq_assert (errno == EBUSY); + } +} + +void zmq::pgm_sender_t::timer_event (int token) +{ + if (token == rx_timer_id) + in_event (); + + zmq_assert (token == tx_timer_id); + out_event (); } #endif diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index bee416c..6d3ae31 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -58,9 +58,16 @@ namespace zmq // i_poll_events interface implementation. void in_event (); void out_event (); + void timer_event (int token); private: + // TX and RX timeout timer ID's. + enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1}; + + // Timers are running. + bool has_tx_timer, has_rx_timer; + // Message encoder. encoder_t encoder; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 3d55e69..6a6f549 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -417,18 +417,55 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) const int status = pgm_send (sock, data_, data_len_, &nbytes); - if (nbytes != data_len_) { - zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK); - zmq_assert (nbytes == 0); - } - // We have to write all data as one packet. - if (nbytes > 0) + if (nbytes > 0) { + zmq_assert (status == PGM_IO_STATUS_NORMAL); zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_); + } else { + zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK); + + if (status == PGM_IO_STATUS_RATE_LIMITED) + errno = ENOMEM; + else + errno = EBUSY; + } + + // Save return value. + last_tx_status = status; return nbytes; } +long zmq::pgm_socket_t::get_rx_timeout () +{ + if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED && last_rx_status != PGM_IO_STATUS_TIMER_PENDING) + return -1; + + struct timeval tv; + socklen_t optlen = sizeof (tv); + const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN : PGM_TIME_REMAIN, &tv, &optlen); + zmq_assert (rc); + + const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + + return timeout; +} + +long zmq::pgm_socket_t::get_tx_timeout () +{ + if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED) + return -1; + + struct timeval tv; + socklen_t optlen = sizeof (tv); + const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen); + zmq_assert (rc); + + const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + + return timeout; +} + // Return max TSDU size without fragmentation from current PGM transport. size_t zmq::pgm_socket_t::get_max_tsdu_size () { @@ -457,7 +494,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) nbytes_processed = 0; pgm_msgv_processed = 0; errno = EAGAIN; - return -1; + return 0; } // If we have are going first time or if we have processed all pgm_msgv_t @@ -479,23 +516,19 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Invalid parameters zmq_assert (status != PGM_IO_STATUS_ERROR); + last_rx_status = status; + // In a case when no ODATA/RDATA fired POLLIN event (SPM...) // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING. if (status == PGM_IO_STATUS_TIMER_PENDING) { zmq_assert (nbytes_rec == 0); - struct timeval tv; - socklen_t optlen = sizeof (tv); - const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen); - - zmq_assert (rc); - // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; errno = EBUSY; - return -1; + return 0; } // Send SPMR, NAK, ACK is rate limited. @@ -503,15 +536,11 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_assert (nbytes_rec == 0); - struct timeval tv; - socklen_t optlen = sizeof (tv); - const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen); - // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; - errno = EBUSY; - return -1; + errno = ENOMEM; + return 0; } // No peers and hence no incoming packets. @@ -523,7 +552,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // returned. nbytes_rec = 0; errno = EAGAIN; - return -1; + return 0; } // Data loss. @@ -548,6 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_assert (pgm_msgv_processed <= pgm_msgv_len); } + // Zero byte payloads are valid in PGM, but not 0MQ protocol. zmq_assert (nbytes_rec > 0); // Only one APDU per pgm_msgv_t structure is allowed. @@ -587,6 +617,15 @@ void zmq::pgm_socket_t::process_upstream () // No data should be returned. zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK)); + + last_rx_status = status; + + if (status == PGM_IO_STATUS_TIMER_PENDING) + errno = EBUSY; + else if (status == PGM_IO_STATUS_RATE_LIMITED) + errno = ENOMEM; + else + errno = EAGAIN; } #endif diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 334e42f..142fdcc 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -67,6 +67,9 @@ namespace zmq // Receive data from pgm socket. ssize_t receive (void **data_, const pgm_tsi_t **tsi_); + long get_rx_timeout (); + long get_tx_timeout (); + // POLLIN on sender side should mean NAK or SPMR receiving. // process_upstream function is used to handle such a situation. void process_upstream (); @@ -76,6 +79,8 @@ namespace zmq // OpenPGM transport pgm_sock_t* sock; + int last_rx_status, last_tx_status; + // Associated socket options. options_t options; -- cgit v1.2.3