From 73b765e4b497f6a505cbf88c524085fa0e58e59c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 13 Dec 2009 09:11:08 +0100 Subject: PGM transport fixed --- src/encoder.hpp | 26 ++++--- src/pgm_receiver.cpp | 5 +- src/pgm_sender.cpp | 6 +- src/pgm_socket.cpp | 193 ++++++++++++++++++++------------------------------- src/pgm_socket.hpp | 5 +- src/zmq_engine.cpp | 4 +- 6 files changed, 105 insertions(+), 134 deletions(-) (limited to 'src') diff --git a/src/encoder.hpp b/src/encoder.hpp index 35c63b0..cb43f9f 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -50,12 +50,18 @@ namespace zmq free (buf); } - // The function returns a batch of binary data. If offset is not NULL, - // it is filled by offset of the first message in the batch. If there's - // no beginning of a message in the batch, offset is set to -1. - inline void get_buffer (unsigned char **data_, size_t *size_, + // The function returns a batch of binary data. The data + // are filled to a supplied buffer. If no buffer is supplied (data_ + // points to NULL) decoder object will provide buffer of its own. + // If offset is not NULL, it is filled by offset of the first message + // in the batch.If there's no beginning of a message in the batch, + // offset is set to -1. + inline void get_data (unsigned char **data_, size_t *size_, int *offset_ = NULL) { + unsigned char *buffer = !*data_ ? buf : *data_; + size_t buffersize = !*data_ ? bufsize : *size_; + size_t pos = 0; if (offset_) *offset_ = -1; @@ -67,7 +73,7 @@ namespace zmq // in the buffer. if (!to_write) { if (!(static_cast (this)->*next) ()) { - *data_ = buf; + *data_ = buffer; *size_ = pos; return; } @@ -91,7 +97,7 @@ namespace zmq // As a consequence, large messages being sent won't block // other engines running in the same I/O thread for excessive // amounts of time. - if (!pos && to_write >= bufsize) { + if (!pos && !*data_ && to_write >= buffersize) { *data_ = write_pos; *size_ = to_write; write_pos = NULL; @@ -100,13 +106,13 @@ namespace zmq } // Copy data to the buffer. If the buffer is full, return. - size_t to_copy = std::min (to_write, bufsize - pos); - memcpy (buf + pos, write_pos, to_copy); + size_t to_copy = std::min (to_write, buffersize - pos); + memcpy (buffer + pos, write_pos, to_copy); pos += to_copy; write_pos += to_copy; to_write -= to_copy; - if (pos == bufsize) { - *data_ = buf; + if (pos == buffersize) { + *data_ = buffer; *size_ = pos; return; } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index aa7bab0..b71becc 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -194,7 +194,7 @@ void zmq::pgm_receiver_t::in_event () it->second.joined = true; // Create and connect decoder for joined peer. - it->second.decoder = new zmq_decoder_t; + it->second.decoder = new zmq_decoder_t (0); it->second.decoder->set_inout (inout); #ifdef ZMQ_HAVE_OPENPGM1 @@ -209,7 +209,8 @@ void zmq::pgm_receiver_t::in_event () if (nbytes > 0) { // Push all the data to the decoder. - it->second.decoder->write (raw_data, nbytes); + // TODO: process_buffer may not process entire buffer! + it->second.decoder->process_buffer (raw_data, nbytes); } } while (nbytes > 0); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 0a958c5..964e00b 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -49,6 +49,7 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, const options_t &options_, const char *session_name_) : io_object_t (parent_), + encoder (0), pgm_socket (false, options_), options (options_), session_name (session_name_), @@ -162,8 +163,9 @@ void zmq::pgm_sender_t::out_event () // First two bytes /sizeof (uint16_t)/ are used to store message // offset in following steps. - write_size = encoder.read (out_buffer + sizeof (uint16_t), - out_buffer_size - sizeof (uint16_t), &first_message_offset); + unsigned char *bf = out_buffer + sizeof (uint16_t); + write_size = out_buffer_size - sizeof (uint16_t); + encoder.get_data (&bf, &write_size, &first_message_offset); write_pos = 0; // If there are no data to write stop polling for output. diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1f773d8..8400b83 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -56,7 +56,7 @@ #endif zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : - g_transport (NULL), + transport (NULL), options (options_), receiver (receiver_), port_number (0), @@ -120,7 +120,7 @@ int zmq::pgm_socket_t::open_transport (void) __FILE__, __LINE__); // Can not open transport before destroying old one. - zmq_assert (g_transport == NULL); + zmq_assert (transport == NULL); // Zero counter used in msgrecv. nbytes_rec = 0; @@ -222,13 +222,13 @@ int zmq::pgm_socket_t::open_transport (void) } #ifdef ZMQ_HAVE_OPENPGM1 - rc = pgm_transport_create (&g_transport, &gsi, 0, port_number, &recv_gsr, + rc = pgm_transport_create (&transport, &gsi, 0, port_number, &recv_gsr, 1, &send_gsr); if (rc != 0) { return -1; } #elif defined ZMQ_HAVE_OPENPGM2 - if (!pgm_transport_create (&g_transport, res, &pgm_error)) { + if (!pgm_transport_create (&transport, res, &pgm_error)) { pgm_if_free_transport_info (res); // TODO: tranlate errors from glib into errnos. errno = EINVAL; @@ -243,14 +243,14 @@ int zmq::pgm_socket_t::open_transport (void) // Common parameters for receiver and sender. // Set maximum transport protocol data unit size (TPDU). - rc = pgm_transport_set_max_tpdu (g_transport, pgm_max_tpdu); + rc = pgm_transport_set_max_tpdu (transport, pgm_max_tpdu); if (rc != pgm_ok) { errno = EINVAL; return -1; } // Set maximum number of network hops to cross. - rc = pgm_transport_set_hops (g_transport, 16); + rc = pgm_transport_set_hops (transport, 16); if (rc != pgm_ok) { errno = EINVAL; return -1; @@ -258,85 +258,60 @@ int zmq::pgm_socket_t::open_transport (void) #ifdef ZMQ_HAVE_OPENPGM2 // Set nonblocking send/recv sockets. - if (!pgm_transport_set_nonblocking (g_transport, true)) { + if (!pgm_transport_set_nonblocking (transport, true)) { errno = EINVAL; return -1; } #endif - // Receiver transport. if (receiver) { + + // Receiver transport. // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. #if defined ZMQ_HAVE_OPENPGM1 - rc = pgm_transport_set_recv_only (g_transport, false); + rc = pgm_transport_set_recv_only (transport, false); #elif defined ZMQ_HAVE_OPENPGM2 - rc = pgm_transport_set_recv_only (g_transport, true, false); + rc = pgm_transport_set_recv_only (transport, true, false); #endif - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + zmq_assert (rc == pgm_ok); // Set NAK transmit back-off interval [us]. - rc = pgm_transport_set_nak_bo_ivl (g_transport, 50*1000); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + rc = pgm_transport_set_nak_bo_ivl (transport, 50 * 1000); + zmq_assert (rc == pgm_ok); // Set timeout before repeating NAK [us]. - rc = pgm_transport_set_nak_rpt_ivl (g_transport, 200*1000); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + rc = pgm_transport_set_nak_rpt_ivl (transport, 200 * 1000); + zmq_assert (rc == pgm_ok); // Set timeout for receiving RDATA. - rc = pgm_transport_set_nak_rdata_ivl (g_transport, 200*1000); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + rc = pgm_transport_set_nak_rdata_ivl (transport, 200 * 1000); + zmq_assert (rc == pgm_ok); // Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES). - rc = pgm_transport_set_nak_data_retries (g_transport, 5); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + rc = pgm_transport_set_nak_data_retries (transport, 5); + zmq_assert (rc == pgm_ok); // Set retries for NCF after NAK (NAK_NCF_RETRIES). - rc = pgm_transport_set_nak_ncf_retries (g_transport, 2); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + rc = pgm_transport_set_nak_ncf_retries (transport, 2); + zmq_assert (rc == pgm_ok); // Set timeout for removing a dead peer [us]. - rc = pgm_transport_set_peer_expiry (g_transport, 5*8192*1000); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + rc = pgm_transport_set_peer_expiry (transport, 5 * 8192 * 1000); + zmq_assert (rc == pgm_ok); // Set expiration time of SPM Requests [us]. - rc = pgm_transport_set_spmr_expiry (g_transport, 25*1000); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + rc = pgm_transport_set_spmr_expiry (transport, 25 * 1000); + zmq_assert (rc == pgm_ok); // Set the size of the receive window. - // - // data rate [B/s] (options.rate is kb/s). + // Data rate is in [B/s]. options.rate is in [kb/s]. if (options.rate <= 0) { errno = EINVAL; return -1; } - - rc = pgm_transport_set_rxw_max_rte (g_transport, + rc = pgm_transport_set_rxw_max_rte (transport, options.rate * 1000 / 8); if (rc != pgm_ok) { errno = EINVAL; @@ -348,32 +323,27 @@ int zmq::pgm_socket_t::open_transport (void) errno = EINVAL; return -1; } - - rc = pgm_transport_set_rxw_secs (g_transport, options.recovery_ivl); + rc = pgm_transport_set_rxw_secs (transport, options.recovery_ivl); if (rc != pgm_ok) { errno = EINVAL; return -1; } - // Sender transport. } else { + // Sender transport. + // Set transport->can_recv = FALSE, waiting_pipe will not be read. - rc = pgm_transport_set_send_only (g_transport, TRUE); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + rc = pgm_transport_set_send_only (transport, TRUE); + zmq_assert (rc == pgm_ok); // Set the size of the send window. - // - // data rate [B/s] (options.rate is kb/s). + // Data rate is in [B/s] options.rate is in [kb/s]. if (options.rate <= 0) { errno = EINVAL; return -1; } - - rc = pgm_transport_set_txw_max_rte (g_transport, + rc = pgm_transport_set_txw_max_rte (transport, options.rate * 1000 / 8); if (rc != pgm_ok) { errno = EINVAL; @@ -385,8 +355,7 @@ int zmq::pgm_socket_t::open_transport (void) errno = EINVAL; return -1; } - - rc = pgm_transport_set_txw_secs (g_transport, options.recovery_ivl); + rc = pgm_transport_set_txw_secs (transport, options.recovery_ivl); if (rc != pgm_ok) { errno = EINVAL; return -1; @@ -399,7 +368,7 @@ int zmq::pgm_socket_t::open_transport (void) int to_preallocate = options.recovery_ivl * (options.rate * 1000 / 8) / (pgm_max_tpdu - 40 - 20); - rc = pgm_transport_set_txw_preallocate (g_transport, to_preallocate); + rc = pgm_transport_set_txw_preallocate (transport, to_preallocate); if (rc != 0) { errno = EINVAL; return -1; @@ -410,48 +379,38 @@ int zmq::pgm_socket_t::open_transport (void) #endif // Set interval of background SPM packets [us]. - rc = pgm_transport_set_ambient_spm (g_transport, 8192 * 1000); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + rc = pgm_transport_set_ambient_spm (transport, 8192 * 1000); + zmq_assert (rc == pgm_ok); // Set intervals of data flushing SPM packets [us]. guint spm_heartbeat[] = {4 * 1000, 4 * 1000, 8 * 1000, 16 * 1000, 32 * 1000, 64 * 1000, 128 * 1000, 256 * 1000, 512 * 1000, 1024 * 1000, 2048 * 1000, 4096 * 1000, 8192 * 1000}; - - rc = pgm_transport_set_heartbeat_spm (g_transport, spm_heartbeat, + rc = pgm_transport_set_heartbeat_spm (transport, spm_heartbeat, G_N_ELEMENTS(spm_heartbeat)); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + zmq_assert (rc == pgm_ok); } // Enable multicast loopback. if (options.use_multicast_loop) { - rc = pgm_transport_set_multicast_loop (g_transport, true); - if (rc != pgm_ok) { - errno = EINVAL; - return -1; - } + rc = pgm_transport_set_multicast_loop (transport, true); + zmq_assert (rc == pgm_ok); } // Bind a transport to the specified network devices. #ifdef ZMQ_HAVE_OPENPGM1 - rc = pgm_transport_bind (g_transport); + rc = pgm_transport_bind (transport); if (rc != 0) { return -1; } #elif defined ZMQ_HAVE_OPENPGM2 - if (!pgm_transport_bind (g_transport, &pgm_error)) { + if (!pgm_transport_bind (transport, &pgm_error)) { // TODO: tranlate errors from glib into errnos. return -1; } #endif - zmq_log (1, "PGM transport binded, %s(%i)\n", __FILE__, __LINE__); + zmq_log (1, "PGM transport bound, %s(%i)\n", __FILE__, __LINE__); return 0; } @@ -463,18 +422,18 @@ zmq::pgm_socket_t::~pgm_socket_t () delete [] pgm_msgv; } - if (g_transport) + if (transport) close_transport (); } void zmq::pgm_socket_t::close_transport (void) { - // g_transport has to be valid. - zmq_assert (g_transport); + // transport has to be valid. + zmq_assert (transport); - pgm_transport_destroy (g_transport, TRUE); + pgm_transport_destroy (transport, TRUE); - g_transport = NULL; + transport = NULL; } // Get receiver fds. recv_fd is from transport->recv_sock @@ -492,7 +451,7 @@ int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, memset (fds, '\0', fds_array_size * sizeof (fds)); // Retrieve pollfds from pgm_transport. - int rc = pgm_transport_poll_info (g_transport, fds, &fds_array_size, + int rc = pgm_transport_poll_info (transport, fds, &fds_array_size, POLLIN); // pgm_transport_poll_info has to return 2 pollfds for POLLIN. @@ -508,15 +467,15 @@ int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, #elif defined ZMQ_HAVE_OPENPGM2 // recv_sock2 should not be used - check it. - zmq_assert (g_transport->recv_sock2 == -1); + zmq_assert (transport->recv_sock2 == -1); // Check if transport can receive data and can not send. - zmq_assert (g_transport->can_recv_data); - zmq_assert (!g_transport->can_send_data); + zmq_assert (transport->can_recv_data); + zmq_assert (!transport->can_send_data); // Take FDs directly from transport. - *receive_fd_ = pgm_transport_get_recv_fd (g_transport); - *waiting_pipe_fd_ = pgm_transport_get_pending_fd (g_transport); + *receive_fd_ = pgm_transport_get_recv_fd (transport); + *waiting_pipe_fd_ = pgm_transport_get_pending_fd (transport); #endif return pgm_receiver_fd_count; @@ -541,7 +500,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, memset (fds, '\0', fds_array_size * sizeof (fds)); // Retrieve pollfds from pgm_transport. - int rc = pgm_transport_poll_info (g_transport, fds, &fds_array_size, + int rc = pgm_transport_poll_info (transport, fds, &fds_array_size, POLLOUT | POLLIN); // pgm_transport_poll_info has to return one pollfds for POLLOUT and @@ -560,16 +519,16 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, zmq_assert (rdata_notify_fd_); // recv_sock2 should not be used - check it. - zmq_assert (g_transport->recv_sock2 == -1); + zmq_assert (transport->recv_sock2 == -1); // Check if transport can send data and can not receive. - zmq_assert (g_transport->can_send_data); - zmq_assert (!g_transport->can_recv_data); + zmq_assert (transport->can_send_data); + zmq_assert (!transport->can_recv_data); // Take FDs directly from transport. - *receive_fd_ = pgm_transport_get_recv_fd (g_transport); - *rdata_notify_fd_ = pgm_transport_get_repair_fd (g_transport); - *send_fd_ = pgm_transport_get_send_fd (g_transport); + *receive_fd_ = pgm_transport_get_recv_fd (transport); + *rdata_notify_fd_ = pgm_transport_get_repair_fd (transport); + *send_fd_ = pgm_transport_get_send_fd (transport); #endif return pgm_sender_fd_count; @@ -584,7 +543,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) ssize_t nbytes = 0; iovec iov = {data_,data_len_}; - nbytes = pgm_transport_send_packetv (g_transport, &iov, 1, + nbytes = pgm_transport_send_packetv (transport, &iov, 1, MSG_DONTWAIT | MSG_WAITALL, true); zmq_assert (nbytes != -EINVAL); @@ -601,7 +560,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) size_t nbytes = 0; - PGMIOStatus status = pgm_send (g_transport, data_, data_len_, &nbytes); + PGMIOStatus status = pgm_send (transport, data_, data_len_, &nbytes); if (nbytes != data_len_) { zmq_log (1, "status %i, data_len %i, wrote %iB, %s(%i)\n", @@ -628,7 +587,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) // Return max TSDU size without fragmentation from current PGM transport. size_t zmq::pgm_socket_t::get_max_tsdu_size (void) { - return (size_t)pgm_transport_max_tsdu (g_transport, false); + return (size_t)pgm_transport_max_tsdu (transport, false); } // Returns how many APDUs are needed to fill reading buffer. @@ -661,7 +620,7 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_) #if defined ZMQ_HAVE_OPENPGM1 // Allocate one packet in tx window. - return pgm_packetv_alloc (g_transport, false); + return pgm_packetv_alloc (transport, false); #elif defined ZMQ_HAVE_OPENPGM2 // Allocate buffer. unsigned char *apdu_buff = new unsigned char [*size_]; @@ -675,7 +634,7 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_) void zmq::pgm_socket_t::free_buffer (void *data_) { #if defined ZMQ_HAVE_OPENPGM1 - pgm_packetv_free1 (g_transport, data_, false); + pgm_packetv_free1 (transport, data_, false); #elif defined ZMQ_HAVE_OPENPGM2 delete [] (unsigned char*) data_; #endif @@ -714,7 +673,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Receive a vector of Application Protocol Domain Unit's (APDUs) // from the transport. #ifdef ZMQ_HAVE_OPENPGM1 - nbytes_rec = pgm_transport_recvmsgv (g_transport, pgm_msgv, + nbytes_rec = pgm_transport_recvmsgv (transport, pgm_msgv, pgm_msgv_len, MSG_DONTWAIT); // In a case when no ODATA/RDATA fired POLLIN event (SPM...) @@ -731,10 +690,10 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) if (nbytes_rec == -1 && errno == ECONNRESET) { // Save lost data TSI. - *tsi_ = &g_transport->lost_data_tsi; + *tsi_ = &transport->lost_data_tsi; zmq_log (1, "Data loss detected %s, %s(%i)\n", - pgm_print_tsi (&g_transport->lost_data_tsi), __FILE__, __LINE__); + pgm_print_tsi (&transport->lost_data_tsi), __FILE__, __LINE__); nbytes_rec = 0; @@ -751,7 +710,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) #elif defined ZMQ_HAVE_OPENPGM2 GError *pgm_error = NULL; - const PGMIOStatus status = pgm_recvmsgv (g_transport, pgm_msgv, + const PGMIOStatus status = pgm_recvmsgv (transport, pgm_msgv, pgm_msgv_len, MSG_DONTWAIT, &nbytes_rec, &pgm_error); if (nbytes_rec > 0) { @@ -777,7 +736,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n", status, (int) nbytes_rec, __FILE__, __LINE__); - pgm_peer_t* peer = (pgm_peer_t*) g_transport->peers_pending->data; + pgm_peer_t* peer = (pgm_peer_t*) transport->peers_pending->data; // Save lost data TSI. *tsi_ = &peer->tsi; @@ -855,7 +814,7 @@ void zmq::pgm_socket_t::process_upstream (void) // We acctually do not want to read any data here we are going to // process NAK. - dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg, + dummy_bytes = pgm_transport_recvmsgv (transport, &dummy_msg, 1, MSG_DONTWAIT); // No data should be returned. @@ -865,7 +824,7 @@ void zmq::pgm_socket_t::process_upstream (void) size_t dummy_bytes = 0; GError *pgm_error = NULL; - PGMIOStatus status = pgm_recvmsgv (g_transport, &dummy_msg, + PGMIOStatus status = pgm_recvmsgv (transport, &dummy_msg, 1, MSG_DONTWAIT, &dummy_bytes, &pgm_error); zmq_log (1, "upstream status %i, nbytes %i, %s(%i)\n", diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 71fa9c6..6c67306 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -62,7 +62,8 @@ namespace zmq // Get sender and receiver fds and store it to user allocated // memory. Receive fd is used to process NAKs from peers. - int get_sender_fds (int *send_fd_, int *receive_fd_, int *rdata_notify_fd_ = NULL); + int get_sender_fds (int *send_fd_, int *receive_fd_, + int *rdata_notify_fd_ = NULL); // Send data as one APDU, transmit window owned memory. size_t send (unsigned char *data_, size_t data_len_); @@ -83,7 +84,7 @@ namespace zmq protected: // OpenPGM transport - pgm_transport_t* g_transport; + pgm_transport_t* transport; private: diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index d474f4e..c04f29b 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -106,7 +106,9 @@ void zmq::zmq_engine_t::out_event () { // If write buffer is empty, try to read new data from the encoder. if (!outsize) { - encoder.get_buffer (&outpos, &outsize); + + outpos = NULL; + encoder.get_data (&outpos, &outsize); // If there is no data to send, stop polling for output. if (outsize == 0) { -- cgit v1.2.3