From 5852db451a76905336601c5ba3e4f33006f007fb Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 28 Dec 2009 11:51:06 +0100 Subject: PGM code cleanup --- src/pgm_socket.cpp | 178 ++++++++++++++++------------------------------------- 1 file changed, 52 insertions(+), 126 deletions(-) (limited to 'src/pgm_socket.cpp') diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index c3802b4..72698cf 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -31,34 +31,32 @@ #define CONFIG_HAVE_POLL #endif -#include +#include #include -#include #include "options.hpp" #include "pgm_socket.hpp" #include "config.hpp" #include "err.hpp" #include "uuid.hpp" +#include "stdint.hpp" zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : transport (NULL), options (options_), receiver (receiver_), - port_number (0), - udp_encapsulation (false), pgm_msgv (NULL), nbytes_rec (0), nbytes_processed (0), pgm_msgv_processed (0), pgm_msgv_len (0) { - } int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) { - udp_encapsulation = udp_encapsulation_; + // Can not open transport before destroying old one. + zmq_assert (transport == NULL); // Parse port number. const char *port_delim = strchr (network_, ':'); @@ -67,44 +65,21 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) return -1; } - port_number = atoi (port_delim + 1); + uint16_t port_number = atoi (port_delim + 1); + char network [256]; if (port_delim - network_ >= (int) sizeof (network) - 1) { errno = EINVAL; return -1; } - memset (network, '\0', sizeof (network)); memcpy (network, network_, port_delim - network_); - - // Open PGM transport. - int rc = open_transport (); - if (rc != 0) - return -1; - - // For receiver transport preallocate pgm_msgv array. - // in_batch_size configured in confing.hpp - if (receiver) { - pgm_msgv_len = get_max_apdu_at_once (in_batch_size); - // TODO: use malloc instead of new - pgm_msgv = new pgm_msgv_t [pgm_msgv_len]; - } - - return 0; -} - -int zmq::pgm_socket_t::open_transport () -{ - // Can not open transport before destroying old one. - zmq_assert (transport == NULL); - + // Zero counter used in msgrecv. nbytes_rec = 0; nbytes_processed = 0; pgm_msgv_processed = 0; - // TODO: Converting bool to int? Not nice. - int pgm_ok = true; GError *pgm_error = NULL; // Init PGM transport. @@ -135,8 +110,7 @@ int zmq::pgm_socket_t::open_transport () } rc = pgm_gsi_create_from_string (&gsi, gsi_base.c_str (), -1); - - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } @@ -167,7 +141,7 @@ int zmq::pgm_socket_t::open_transport () res->ti_dport = port_number; // If we are using UDP encapsulation update gsr or res. - if (udp_encapsulation) { + if (udp_encapsulation_) { res->ti_udp_encap_ucast_port = port_number; res->ti_udp_encap_mcast_port = port_number; } @@ -192,14 +166,14 @@ int zmq::pgm_socket_t::open_transport () // Set maximum transport protocol data unit size (TPDU). rc = pgm_transport_set_max_tpdu (transport, pgm_max_tpdu); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } // Set maximum number of network hops to cross. rc = pgm_transport_set_hops (transport, 16); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } @@ -212,46 +186,45 @@ int zmq::pgm_socket_t::open_transport () if (receiver) { - // Receiver transport. + // Receiver transport. - // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. rc = pgm_transport_set_recv_only (transport, true, false); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); if (options.rcvbuf) { rc = pgm_transport_set_rcvbuf (transport, (int) options.rcvbuf); - if (rc != pgm_ok) + if (rc != TRUE) return -1; } // Set NAK transmit back-off interval [us]. rc = pgm_transport_set_nak_bo_ivl (transport, 50 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set timeout before repeating NAK [us]. rc = pgm_transport_set_nak_rpt_ivl (transport, 200 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set timeout for receiving RDATA. rc = pgm_transport_set_nak_rdata_ivl (transport, 200 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES). rc = pgm_transport_set_nak_data_retries (transport, 5); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set retries for NCF after NAK (NAK_NCF_RETRIES). rc = pgm_transport_set_nak_ncf_retries (transport, 2); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set timeout for removing a dead peer [us]. rc = pgm_transport_set_peer_expiry (transport, 5 * 8192 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set expiration time of SPM Requests [us]. rc = pgm_transport_set_spmr_expiry (transport, 25 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set the size of the receive window. // Data rate is in [B/s]. options.rate is in [kb/s]. @@ -261,7 +234,7 @@ int zmq::pgm_socket_t::open_transport () } rc = pgm_transport_set_rxw_max_rte (transport, options.rate * 1000 / 8); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } @@ -272,7 +245,7 @@ int zmq::pgm_socket_t::open_transport () return -1; } rc = pgm_transport_set_rxw_secs (transport, options.recovery_ivl); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } @@ -281,13 +254,13 @@ int zmq::pgm_socket_t::open_transport () // Sender transport. - // Set transport->can_recv = FALSE, waiting_pipe will not be read. + // Waiting pipe won't be read. rc = pgm_transport_set_send_only (transport, TRUE); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); if (options.sndbuf) { rc = pgm_transport_set_sndbuf (transport, (int) options.sndbuf); - if (rc != pgm_ok) + if (rc != TRUE) return -1; } @@ -299,7 +272,7 @@ int zmq::pgm_socket_t::open_transport () } rc = pgm_transport_set_txw_max_rte (transport, options.rate * 1000 / 8); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } @@ -310,14 +283,14 @@ int zmq::pgm_socket_t::open_transport () return -1; } rc = pgm_transport_set_txw_secs (transport, options.recovery_ivl); - if (rc != pgm_ok) { + if (rc != TRUE) { errno = EINVAL; return -1; } // Set interval of background SPM packets [us]. rc = pgm_transport_set_ambient_spm (transport, 8192 * 1000); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); // Set intervals of data flushing SPM packets [us]. guint spm_heartbeat[] = {4 * 1000, 4 * 1000, 8 * 1000, 16 * 1000, @@ -325,13 +298,13 @@ int zmq::pgm_socket_t::open_transport () 1024 * 1000, 2048 * 1000, 4096 * 1000, 8192 * 1000}; rc = pgm_transport_set_heartbeat_spm (transport, spm_heartbeat, G_N_ELEMENTS(spm_heartbeat)); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); } // Enable multicast loopback. if (options.use_multicast_loop) { rc = pgm_transport_set_multicast_loop (transport, true); - zmq_assert (rc == pgm_ok); + zmq_assert (rc == TRUE); } // Bind a transport to the specified network devices. @@ -354,28 +327,28 @@ int zmq::pgm_socket_t::open_transport () zmq_assert (false); } + // For receiver transport preallocate pgm_msgv array. + // TODO: ? + if (receiver) { + zmq_assert (in_batch_size > 0); + size_t max_tsdu_size = get_max_tsdu_size (); + pgm_msgv_len = (int) in_batch_size / max_tsdu_size; + if ((int) in_batch_size % max_tsdu_size) + pgm_msgv_len++; + zmq_assert (pgm_msgv_len); + + pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len); + } + return 0; } zmq::pgm_socket_t::~pgm_socket_t () { - // Celanup. - if (pgm_msgv) { - delete [] pgm_msgv; - } - - if (transport) - close_transport (); -} - -void zmq::pgm_socket_t::close_transport () -{ - // transport has to be valid. - zmq_assert (transport); - - pgm_transport_destroy (transport, TRUE); - - transport = NULL; + if (pgm_msgv) + free (pgm_msgv); + if (transport) + pgm_transport_destroy (transport, TRUE); } // Get receiver fds. recv_fd is from transport->recv_sock @@ -401,7 +374,7 @@ void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, // Get fds and store them into user allocated memory. // sender_fd is from pgm_transport->send_sock. // receive_fd_ is from transport->recv_sock. -// rdata_notify_fd_ is from transport->rdata_notify (PGM2 only). +// rdata_notify_fd_ is from transport->rdata_notify. void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, int *rdata_notify_fd_) { @@ -445,52 +418,9 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) // Return max TSDU size without fragmentation from current PGM transport. size_t zmq::pgm_socket_t::get_max_tsdu_size () { - return (size_t)pgm_transport_max_tsdu (transport, false); -} - -// Returns how many APDUs are needed to fill reading buffer. -size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_) -{ - zmq_assert (readbuf_size_ > 0); - - // Read max TSDU size without fragmentation. - size_t max_tsdu_size = get_max_tsdu_size (); - - // Calculate number of APDUs needed to fill the reading buffer. - size_t apdu_count = (int)readbuf_size_ / max_tsdu_size; - - if ((int) readbuf_size_ % max_tsdu_size) - apdu_count ++; - - // Have to have at least one APDU. - zmq_assert (apdu_count); - - return apdu_count; -} - -// Allocate buffer for one packet from the transmit window, The memory buffer -// is owned by the transmit window and so must be returned to the window with -// content via pgm_transport_send() calls or unused with pgm_packetv_free1(). -void *zmq::pgm_socket_t::get_buffer (size_t *size_) -{ - // Store size. - *size_ = get_max_tsdu_size (); - - // Allocate buffer. - // TODO: use malloc instead of new - unsigned char *apdu_buff = new unsigned char [*size_]; - zmq_assert (apdu_buff); - return apdu_buff; -} - -// Return an unused packet allocated from the transmit window -// via pgm_packetv_alloc(). -void zmq::pgm_socket_t::free_buffer (void *data_) -{ - delete [] (unsigned char*) data_; + return (size_t) pgm_transport_max_tsdu (transport, false); } - // pgm_transport_recvmsgv is called to fill the pgm_msgv array up to // pgm_msgv_len. In subsequent calls data from pgm_msgv structure are // returned. @@ -525,9 +455,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) const PGMIOStatus status = pgm_recvmsgv (transport, pgm_msgv, pgm_msgv_len, MSG_DONTWAIT, &nbytes_rec, &pgm_error); - if (status == PGM_IO_STATUS_ERROR) { - zmq_assert (false); - } + zmq_assert (status != PGM_IO_STATUS_ERROR); // In a case when no ODATA/RDATA fired POLLIN event (SPM...) // pgm_recvmsg returns ?. @@ -591,9 +519,7 @@ void zmq::pgm_socket_t::process_upstream () PGMIOStatus status = pgm_recvmsgv (transport, &dummy_msg, 1, MSG_DONTWAIT, &dummy_bytes, &pgm_error); - if (status == PGM_IO_STATUS_ERROR) { - zmq_assert (false); - } + zmq_assert (status != PGM_IO_STATUS_ERROR); // No data should be returned. zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || -- cgit v1.2.3