From 5852db451a76905336601c5ba3e4f33006f007fb Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 28 Dec 2009 11:51:06 +0100 Subject: PGM code cleanup --- src/pgm_receiver.cpp | 148 +++++++++++++++++------------------------- src/pgm_receiver.hpp | 13 ++-- src/pgm_sender.cpp | 94 ++++++++++----------------- src/pgm_sender.hpp | 16 +---- src/pgm_socket.cpp | 178 +++++++++++++++------------------------------------ src/pgm_socket.hpp | 37 ++--------- 6 files changed, 159 insertions(+), 327 deletions(-) diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 1d4d695..0eb92d2 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -27,9 +27,6 @@ #include "windows.hpp" #endif -#include -#include - #include "pgm_receiver.hpp" #include "err.hpp" #include "stdint.hpp" @@ -58,20 +55,12 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) void zmq::pgm_receiver_t::plug (i_inout *inout_) { - // Allocate 2 fds one for socket second for waiting pipe. + // Retrieve PGM fds and start polling. int socket_fd; int waiting_pipe_fd; - - // Fill socket_fd and waiting_pipe_fd from PGM transport pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd); - - // Add socket_fd into poller. socket_handle = add_fd (socket_fd); - - // Add waiting_pipe_fd into poller. pipe_handle = add_fd (waiting_pipe_fd); - - // Set POLLIN for both handlers. set_pollin (pipe_handle); set_pollin (socket_handle); @@ -81,15 +70,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_) void zmq::pgm_receiver_t::unplug () { // Delete decoders. - for (peer_t::iterator it = peers.begin (); it != peers.end (); it++) { + for (peers_t::iterator it = peers.begin (); it != peers.end (); it++) { if (it->second.decoder != NULL) delete it->second.decoder; } - peers.clear (); + // Stop polling. rm_fd (socket_handle); rm_fd (pipe_handle); + inout = NULL; } @@ -98,101 +88,77 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } -// POLLIN event from socket or waiting_pipe. void zmq::pgm_receiver_t::in_event () { - // Iterator to peers map. - peer_t::iterator it; - - // Data from PGM socket. - unsigned char *raw_data = NULL; + // Read data from the underlying pgm_socket. + unsigned char *data = NULL; const pgm_tsi_t *tsi = NULL; - ssize_t nbytes = 0; - - do { - - // Read data from underlying pgm_socket. - nbytes = pgm_socket.receive ((void**) &raw_data, &tsi); - - // No ODATA or RDATA. - if (!nbytes) - break; + ssize_t received = pgm_socket.receive ((void**) &data, &tsi); - // Fid TSI in peers list. - it = peers.find (*tsi); + // No data to process. This may happen if the packet received is + // neither ODATA nor ODATA. + if (received == 0) + return; - // Data loss. - if (nbytes == -1) { + // Find the peer based on its TSI. + peers_t::iterator it = peers.find (*tsi); - zmq_assert (it != peers.end ()); - - // Delete decoder and set joined to false. - it->second.joined = false; - - if (it->second.decoder != NULL) { - delete it->second.decoder; - it->second.decoder = NULL; - } - - break; + // Data loss. Delete decoder and mark the peer as disjoint. + if (received == -1) { + zmq_assert (it != peers.end ()); + it->second.joined = false; + if (it->second.decoder != NULL) { + delete it->second.decoder; + it->second.decoder = NULL; } + return; + } - // Read offset of the fist message in current APDU. - zmq_assert ((size_t) nbytes >= sizeof (uint16_t)); - uint16_t apdu_offset = get_uint16 (raw_data); + // New peer. Add it to the list of know but unjoint peers. + if (it == peers.end ()) { + peer_info_t peer_info = {false, NULL}; + it = peers.insert (std::make_pair (*tsi, peer_info)).first; + } - // Shift raw_data & decrease nbytes by the first message offset - // information (sizeof uint16_t). - raw_data += sizeof (uint16_t); - nbytes -= sizeof (uint16_t); + // Read the offset of the fist message in the current packet. + zmq_assert ((size_t) received >= sizeof (uint16_t)); + uint16_t offset = get_uint16 (data); + data += sizeof (uint16_t); + received -= sizeof (uint16_t); - // New peer. - if (it == peers.end ()) { - peer_info_t peer_info = {false, NULL}; - it = peers.insert (std::make_pair (*tsi, peer_info)).first; - } + // Join the stream if needed. + if (!it->second.joined) { - // There is not beginning of the message in current APDU and we - // are not joined jet -> throwing data. - if (apdu_offset == 0xFFFF && !it->second.joined) { - break; - } + // There is no beginning of the message in current packet. + // Ignore the data. + if (offset == 0xffff) + return; - // Now is the possibility to join the stream. - if (!it->second.joined) { - - zmq_assert (apdu_offset <= nbytes); - zmq_assert (it->second.decoder == NULL); + zmq_assert (offset <= received); + zmq_assert (it->second.decoder == NULL); - // We have to move data to the begining of the first message. - raw_data += apdu_offset; - nbytes -= apdu_offset; + // We have to move data to the begining of the first message. + data += offset; + received -= offset; - // Joined the stream. - it->second.joined = true; + // Mark the stream as joined. + it->second.joined = true; - // Create and connect decoder for joined peer. - it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); - it->second.decoder->set_inout (inout); - } - - if (nbytes > 0) { - - // Push all the data to the decoder. - // TODO: process_buffer may not process entire buffer! - it->second.decoder->process_buffer (raw_data, nbytes); - } + // Create and connect decoder for the peer. + it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); + it->second.decoder->set_inout (inout); + } - } while (nbytes > 0); + if (received) { - // Flush any messages decoder may have produced to the dispatcher. - inout->flush (); - -} + // Push all the data to the decoder. + // TODO: process_buffer may not process entire buffer! + size_t processed = it->second.decoder->process_buffer (data, received); + zmq_assert (processed == received); -void zmq::pgm_receiver_t::out_event () -{ - zmq_assert (false); + // Flush any messages decoder may have produced. + inout->flush (); + } } #endif diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 91169f4..6fadbc3 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -29,7 +29,7 @@ #endif #include -#include +#include #include "io_object.hpp" #include "i_engine.hpp" @@ -45,8 +45,6 @@ namespace zmq public: - // Creates gm_engine. Underlying PGM connection is initialised - // using network_ parameter. pgm_receiver_t (class io_thread_t *parent_, const options_t &options_); ~pgm_receiver_t (); @@ -59,11 +57,12 @@ namespace zmq // i_poll_events interface implementation. void in_event (); - void out_event (); private: - // Map to hold TSI, joined and decoder for each peer. + // If joined is true we are already getting messages from the peer. + // It it's false, we are getting data but still we haven't seen + // beginning of a message. struct peer_info_t { bool joined; @@ -84,8 +83,8 @@ namespace zmq } }; - typedef std::map peer_t; - peer_t peers; + typedef std::map peers_t; + peers_t peers; // PGM socket. pgm_socket_t pgm_socket; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 880bb09..0d87ee0 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -25,12 +25,13 @@ #include "windows.hpp" #endif -#include +#include #include "io_thread.hpp" #include "pgm_sender.hpp" #include "err.hpp" #include "wire.hpp" +#include "stdint.hpp" zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, const options_t &options_) : @@ -38,18 +39,21 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, encoder (0, false), pgm_socket (false, options_), options (options_), - inout (NULL), out_buffer (NULL), out_buffer_size (0), - write_size (0), - write_pos (0), - first_message_offset (-1) + write_size (0) { } int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) { - return pgm_socket.init (udp_encapsulation_, network_); + int rc = pgm_socket.init (udp_encapsulation_, network_); + if (rc != 0) + return rc; + + out_buffer_size = pgm_socket.get_max_tsdu_size (); + out_buffer = (unsigned char*) malloc (out_buffer_size); + zmq_assert (out_buffer); } void zmq::pgm_sender_t::plug (i_inout *inout_) @@ -61,17 +65,11 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) encoder.set_inout (inout_); - // Fill fds from PGM transport. - pgm_socket.get_sender_fds - (&downlink_socket_fd, &uplink_socket_fd, &rdata_notify_fd); - - // Add downlink_socket_fd into poller. + // Fill fds from PGM transport and add them to the poller. + pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, + &rdata_notify_fd); handle = add_fd (downlink_socket_fd); - - // Add uplink_socket_fd into the poller. uplink_handle = add_fd (uplink_socket_fd); - - // Add rdata_notify_fd into the poller. rdata_notify_handle = add_fd (rdata_notify_fd); // Set POLLIN. We wont never want to stop polling for uplink = we never @@ -81,8 +79,6 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) // Set POLLOUT for downlink_socket_handle. set_pollout (handle); - - inout = inout_; } void zmq::pgm_sender_t::unplug () @@ -91,7 +87,6 @@ void zmq::pgm_sender_t::unplug () rm_fd (uplink_handle); rm_fd (rdata_notify_handle); encoder.set_inout (NULL); - inout = NULL; } void zmq::pgm_sender_t::revive () @@ -103,14 +98,14 @@ void zmq::pgm_sender_t::revive () zmq::pgm_sender_t::~pgm_sender_t () { if (out_buffer) { - pgm_socket.free_buffer (out_buffer); + free (out_buffer); out_buffer = NULL; } } -// In event on sender side means NAK or SPMR receiving from some peer. void zmq::pgm_sender_t::in_event () { + // In event on sender side means NAK or SPMR receiving from some peer. pgm_socket.process_upstream (); } @@ -118,55 +113,36 @@ void zmq::pgm_sender_t::out_event () { // POLLOUT event from send socket. If write buffer is empty, // try to read new data from the encoder. - if (write_pos == write_size) { - - // Get buffer if we do not have already one. - if (!out_buffer) { - out_buffer = (unsigned char*) - pgm_socket.get_buffer (&out_buffer_size); - } + if (write_size == 0) { - assert (out_buffer_size > 0); - - // First two bytes /sizeof (uint16_t)/ are used to store message - // offset in following steps. + // First two bytes (sizeof uint16_t) are used to store message + // offset in following steps. Note that by passing our buffer to + // the get data function we prevent it from returning its own buffer. unsigned char *bf = out_buffer + sizeof (uint16_t); - write_size = out_buffer_size - sizeof (uint16_t); - encoder.get_data (&bf, &write_size, &first_message_offset); - write_pos = 0; + size_t bfsz = out_buffer_size - sizeof (uint16_t); + int offset = -1; + encoder.get_data (&bf, &bfsz, &offset); // If there are no data to write stop polling for output. - if (!write_size) { + if (!bfsz) { reset_pollout (handle); - } else { - // Addning uint16_t for offset in a case when encoder returned > 0B. - write_size += sizeof (uint16_t); + return; } - } - - // If there are any data to write, write them into the socket. - // Note that all data has to written in one write_one_pkt_with_offset call. - if (write_pos < write_size) { - size_t nbytes = write_one_pkt_with_offset (out_buffer + write_pos, - write_size - write_pos, (uint16_t) first_message_offset); - - // We can write either all data or 0 which means rate limit reached. - zmq_assert (write_size - write_pos == nbytes || nbytes == 0); - write_pos += nbytes; + // Put offset information in the buffer. + write_size = bfsz + sizeof (uint16_t); + put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); } -} -size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_, - size_t size_, uint16_t offset_) -{ - // Put offset information in the buffer. - put_uint16 (data_, offset_); - - // Send data. - size_t nbytes = pgm_socket.send (data_, size_); + // Send the data. + size_t nbytes = pgm_socket.send (out_buffer, write_size); - return nbytes; + // We can write either all data or 0 which means rate limit reached. + if (nbytes == write_size) + write_size = 0; + else + zmq_assert (nbytes == 0); } + #endif diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 30b545d..e280843 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -42,6 +42,7 @@ namespace zmq { public: + pgm_sender_t (class io_thread_t *parent_, const options_t &options_); ~pgm_sender_t (); @@ -58,12 +59,6 @@ namespace zmq private: - // Send one APDU with first message offset information. - // Note that first 2 bytes in data_ are used to store the offset_ - // and thus user data has to start at data_ + sizeof (uint16_t). - size_t write_one_pkt_with_offset (unsigned char *data_, size_t size_, - uint16_t offset_); - // Message encoder. zmq_encoder_t encoder; @@ -78,20 +73,15 @@ namespace zmq handle_t uplink_handle; handle_t rdata_notify_handle; - // Parent session. - i_inout *inout; - // Output buffer from pgm_socket. unsigned char *out_buffer; // Output buffer size. size_t out_buffer_size; + // Number of bytes in the buffer to be written to the socket. + // If zero, there are no data to be sent. size_t write_size; - size_t write_pos; - - // Offset of the first mesage in data chunk taken from encoder. - int first_message_offset; pgm_sender_t (const pgm_sender_t&); void operator = (const pgm_sender_t&); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index c3802b4..72698cf 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -31,34 +31,32 @@ #define CONFIG_HAVE_POLL #endif -#include +#include #include -#include #include "options.hpp" #include "pgm_socket.hpp" #include "config.hpp" #include "err.hpp" #include "uuid.hpp" +#include "stdint.hpp" zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : transport (NULL), options (options_), receiver (receiver_), - port_number (0), - udp_encapsulation (false), pgm_msgv (NULL), nbytes_rec (0), nbytes_processed (0), pgm_msgv_processed (0), pgm_msgv_len (0) { - } int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) { - udp_encapsulation = udp_encapsulation_; + // Can not open transport before destroying old one. + zmq_assert (transport == NULL); // Parse port number. const char *port_delim = strchr (network_, ':'); @@ -67,44 +65,21 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) return -1; } - port_number = atoi (port_delim + 1); + uint16_t port_number = atoi (port_delim + 1); + char network [256]; if (port_delim - network_ >= (int) sizeof (network) - 1) { errno = EINVAL; return -1; } - memset (network, '\0', sizeof (network)); memcpy (network, network_, port_delim - network_); - - // Open PGM transport. - int rc = open_transport (); - if (rc != 0) - return -1; - - // For receiver transport preallocate pgm_msgv array. - // in_batch_size configured in confing.hpp - if (receiver) { - pgm_msgv_len = get_max_apdu_at_once (in_batch_size); - // TODO: use malloc instead of new - pgm_msgv = new pgm_msgv_t [pgm_msgv_len]; - } - - return 0; -} - -int zmq::pgm_socket_t::open_transport () -{ - // Can not open transport before destroying old one. - zmq_assert (transport == NULL); - + // Zero counter used in msgrecv. nbytes_rec = 0; nbytes_processed = 0; pgm_msgv_processed = 0; - // TODO: Converting bool to int? Not nice. - int pgm_ok = true; GError *pgm_error = NULL; // Init PGM transport. @@ -135,8 +110,7 @@ int zmq::pgm_socket_t::open_transport () } rc = pgm_gsi_create_from_string (&gsi, gsi_base.c_str (), -1); - - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } @@ -167,7 +141,7 @@ int zmq::pgm_socket_t::open_transport () res->ti_dport = port_number; // If we are using UDP encapsulation update gsr or res. - if (udp_encapsulation) { + if (udp_encapsulation_) { res->ti_udp_encap_ucast_port = port_number; res->ti_udp_encap_mcast_port = port_number; } @@ -192,14 +166,14 @@ int zmq::pgm_socket_t::open_transport () // Set maximum transport protocol data unit size (TPDU). rc = pgm_transport_set_max_tpdu (transport, pgm_max_tpdu); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } // Set maximum number of network hops to cross. rc = pgm_transport_set_hops (transport, 16); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } @@ -212,46 +186,45 @@ int zmq::pgm_socket_t::open_transport () if (receiver) { - // Receiver transport. + // Receiver transport. - // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. rc = pgm_transport_set_recv_only (transport, true, false); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); if (options.rcvbuf) { rc = pgm_transport_set_rcvbuf (transport, (int) options.rcvbuf); - if (rc != pgm_ok) + if (rc != TRUE) return -1; } // Set NAK transmit back-off interval [us]. rc = pgm_transport_set_nak_bo_ivl (transport, 50 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set timeout before repeating NAK [us]. rc = pgm_transport_set_nak_rpt_ivl (transport, 200 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set timeout for receiving RDATA. rc = pgm_transport_set_nak_rdata_ivl (transport, 200 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES). rc = pgm_transport_set_nak_data_retries (transport, 5); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set retries for NCF after NAK (NAK_NCF_RETRIES). rc = pgm_transport_set_nak_ncf_retries (transport, 2); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set timeout for removing a dead peer [us]. rc = pgm_transport_set_peer_expiry (transport, 5 * 8192 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set expiration time of SPM Requests [us]. rc = pgm_transport_set_spmr_expiry (transport, 25 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set the size of the receive window. // Data rate is in [B/s]. options.rate is in [kb/s]. @@ -261,7 +234,7 @@ int zmq::pgm_socket_t::open_transport () } rc = pgm_transport_set_rxw_max_rte (transport, options.rate * 1000 / 8); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } @@ -272,7 +245,7 @@ int zmq::pgm_socket_t::open_transport () return -1; } rc = pgm_transport_set_rxw_secs (transport, options.recovery_ivl); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } @@ -281,13 +254,13 @@ int zmq::pgm_socket_t::open_transport () // Sender transport. - // Set transport->can_recv = FALSE, waiting_pipe will not be read. + // Waiting pipe won't be read. rc = pgm_transport_set_send_only (transport, TRUE); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); if (options.sndbuf) { rc = pgm_transport_set_sndbuf (transport, (int) options.sndbuf); - if (rc != pgm_ok) + if (rc != TRUE) return -1; } @@ -299,7 +272,7 @@ int zmq::pgm_socket_t::open_transport () } rc = pgm_transport_set_txw_max_rte (transport, options.rate * 1000 / 8); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } @@ -310,14 +283,14 @@ int zmq::pgm_socket_t::open_transport () return -1; } rc = pgm_transport_set_txw_secs (transport, options.recovery_ivl); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } // Set interval of background SPM packets [us]. rc = pgm_transport_set_ambient_spm (transport, 8192 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set intervals of data flushing SPM packets [us]. guint spm_heartbeat[] = {4 * 1000, 4 * 1000, 8 * 1000, 16 * 1000, @@ -325,13 +298,13 @@ int zmq::pgm_socket_t::open_transport () 1024 * 1000, 2048 * 1000, 4096 * 1000, 8192 * 1000}; rc = pgm_transport_set_heartbeat_spm (transport, spm_heartbeat, G_N_ELEMENTS(spm_heartbeat)); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); } // Enable multicast loopback. if (options.use_multicast_loop) { rc = pgm_transport_set_multicast_loop (transport, true); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); } // Bind a transport to the specified network devices. @@ -354,28 +327,28 @@ int zmq::pgm_socket_t::open_transport () zmq_assert (false); } + // For receiver transport preallocate pgm_msgv array. + // TODO: ? + if (receiver) { + zmq_assert (in_batch_size > 0); + size_t max_tsdu_size = get_max_tsdu_size (); + pgm_msgv_len = (int) in_batch_size / max_tsdu_size; + if ((int) in_batch_size % max_tsdu_size) + pgm_msgv_len++; + zmq_assert (pgm_msgv_len); + + pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len); + } + return 0; } zmq::pgm_socket_t::~pgm_socket_t () { - // Celanup. - if (pgm_msgv) { - delete [] pgm_msgv; - } - - if (transport) - close_transport (); -} - -void zmq::pgm_socket_t::close_transport () -{ - // transport has to be valid. - zmq_assert (transport); - - pgm_transport_destroy (transport, TRUE); - - transport = NULL; + if (pgm_msgv) + free (pgm_msgv); + if (transport) + pgm_transport_destroy (transport, TRUE); } // Get receiver fds. recv_fd is from transport->recv_sock @@ -401,7 +374,7 @@ void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, // Get fds and store them into user allocated memory. // sender_fd is from pgm_transport->send_sock. // receive_fd_ is from transport->recv_sock. -// rdata_notify_fd_ is from transport->rdata_notify (PGM2 only). +// rdata_notify_fd_ is from transport->rdata_notify. void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, int *rdata_notify_fd_) { @@ -445,52 +418,9 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) // Return max TSDU size without fragmentation from current PGM transport. size_t zmq::pgm_socket_t::get_max_tsdu_size () { - return (size_t)pgm_transport_max_tsdu (transport, false); -} - -// Returns how many APDUs are needed to fill reading buffer. -size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_) -{ - zmq_assert (readbuf_size_ > 0); - - // Read max TSDU size without fragmentation. - size_t max_tsdu_size = get_max_tsdu_size (); - - // Calculate number of APDUs needed to fill the reading buffer. - size_t apdu_count = (int)readbuf_size_ / max_tsdu_size; - - if ((int) readbuf_size_ % max_tsdu_size) - apdu_count ++; - - // Have to have at least one APDU. - zmq_assert (apdu_count); - - return apdu_count; -} - -// Allocate buffer for one packet from the transmit window, The memory buffer -// is owned by the transmit window and so must be returned to the window with -// content via pgm_transport_send() calls or unused with pgm_packetv_free1(). -void *zmq::pgm_socket_t::get_buffer (size_t *size_) -{ - // Store size. - *size_ = get_max_tsdu_size (); - - // Allocate buffer. - // TODO: use malloc instead of new - unsigned char *apdu_buff = new unsigned char [*size_]; - zmq_assert (apdu_buff); - return apdu_buff; -} - -// Return an unused packet allocated from the transmit window -// via pgm_packetv_alloc(). -void zmq::pgm_socket_t::free_buffer (void *data_) -{ - delete [] (unsigned char*) data_; + return (size_t) pgm_transport_max_tsdu (transport, false); } - // pgm_transport_recvmsgv is called to fill the pgm_msgv array up to // pgm_msgv_len. In subsequent calls data from pgm_msgv structure are // returned. @@ -525,9 +455,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) const PGMIOStatus status = pgm_recvmsgv (transport, pgm_msgv, pgm_msgv_len, MSG_DONTWAIT, &nbytes_rec, &pgm_error); - if (status == PGM_IO_STATUS_ERROR) { - zmq_assert (false); - } + zmq_assert (status != PGM_IO_STATUS_ERROR); // In a case when no ODATA/RDATA fired POLLIN event (SPM...) // pgm_recvmsg returns ?. @@ -591,9 +519,7 @@ void zmq::pgm_socket_t::process_upstream () PGMIOStatus status = pgm_recvmsgv (transport, &dummy_msg, 1, MSG_DONTWAIT, &dummy_bytes, &pgm_error); - if (status == PGM_IO_STATUS_ERROR) { - zmq_assert (false); - } + zmq_assert (status != PGM_IO_STATUS_ERROR); // No data should be returned. zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index b1d0718..3b2740c 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -30,7 +30,6 @@ #include -#include "stdint.hpp" #include "options.hpp" namespace zmq @@ -62,11 +61,8 @@ namespace zmq // Send data as one APDU, transmit window owned memory. size_t send (unsigned char *data_, size_t data_len_); - // Allocates one slice for packet in tx window. - void *get_buffer (size_t *size_); - - // Fees memory allocated by get_buffer. - void free_buffer (void *data_); + // Returns max tsdu size without fragmentation. + size_t get_max_tsdu_size (); // Receive data from pgm socket. ssize_t receive (void **data_, const pgm_tsi_t **tsi_); @@ -76,21 +72,9 @@ namespace zmq void process_upstream (); private: - - // Open PGM transport. - int open_transport (); - - // Close transport. - void close_transport (); // OpenPGM transport pgm_transport_t* transport; - - // Returns max tsdu size without fragmentation. - size_t get_max_tsdu_size (); - - // Returns maximum count of apdus which fills readbuf_size_ - size_t get_max_apdu_at_once (size_t readbuf_size_); // Associated socket options. options_t options; @@ -98,19 +82,13 @@ namespace zmq // true when pgm_socket should create receiving side. bool receiver; - // TIBCO Rendezvous format network info. - char network [256]; - - // PGM transport port number. - uint16_t port_number; - - // If we are using UDP encapsulation. - bool udp_encapsulation; - - // Array of pgm_msgv_t structures to store received data + // Array of pgm_msgv_t structures to store received data // from the socket (pgm_transport_recvmsgv). pgm_msgv_t *pgm_msgv; + // Size of pgm_msgv array. + size_t pgm_msgv_len; + // How many bytes were read from pgm socket. size_t nbytes_rec; @@ -119,9 +97,6 @@ namespace zmq // How many messages from pgm_msgv were already sent up. size_t pgm_msgv_processed; - - // Size of pgm_msgv array. - size_t pgm_msgv_len; }; } #endif -- cgit v1.2.3