diff options
author | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:19 +0100 |
---|---|---|
committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:19 +0100 |
commit | a15854bd92db69fcd0b4444fe1b8fe3610a7acf6 (patch) | |
tree | 1214b945d0f0033ff318de367c70525ea141ef56 /src/pgm_socket.cpp |
Imported Upstream version 2.0.7.dfsgupstream/2.0.7.dfsg
Diffstat (limited to 'src/pgm_socket.cpp')
-rw-r--r-- | src/pgm_socket.cpp | 538 |
1 files changed, 538 insertions, 0 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp new file mode 100644 index 0000000..5a952a7 --- /dev/null +++ b/src/pgm_socket.cpp @@ -0,0 +1,538 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#ifdef ZMQ_HAVE_OPENPGM + +#ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#endif + +#ifdef ZMQ_HAVE_LINUX +#include <poll.h> +#endif + +#include <stdlib.h> +#include <string.h> +#include <string> + +#include "options.hpp" +#include "pgm_socket.hpp" +#include "config.hpp" +#include "err.hpp" +#include "uuid.hpp" +#include "stdint.hpp" + +zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : + transport (NULL), + options (options_), + receiver (receiver_), + pgm_msgv (NULL), + pgm_msgv_len (0), + nbytes_rec (0), + nbytes_processed (0), + pgm_msgv_processed (0) +{ +} + +int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) +{ + // Can not open transport before destroying old one. + zmq_assert (transport == NULL); + + // Parse port number. + const char *port_delim = strchr (network_, ':'); + if (!port_delim) { + errno = EINVAL; + return -1; + } + + uint16_t port_number = atoi (port_delim + 1); + + char network [256]; + if (port_delim - network_ >= (int) sizeof (network) - 1) { + errno = EINVAL; + return -1; + } + memset (network, '\0', sizeof (network)); + memcpy (network, network_, port_delim - network_); + + // Zero counter used in msgrecv. + nbytes_rec = 0; + nbytes_processed = 0; + pgm_msgv_processed = 0; + + int rc; + GError *pgm_error = NULL; + + // PGM transport GSI. + pgm_gsi_t gsi; + + std::string gsi_base; + + if (options.identity.size () > 0) { + + // Create gsi from identity. + // TODO: We assume that identity is standard C string here. + // What if it contains binary zeroes? + gsi_base.assign ((const char*) options.identity.data (), + options.identity.size ()); + } else { + + // Generate random gsi. + gsi_base = uuid_t ().to_string (); + } + + rc = pgm_gsi_create_from_string (&gsi, gsi_base.c_str (), -1); + if (rc != TRUE) { + errno = EINVAL; + return -1; + } + + struct pgm_transport_info_t *res = NULL; + struct pgm_transport_info_t hint; + memset (&hint, 0, sizeof (hint)); + hint.ti_family = AF_INET; + + if (!pgm_if_get_transport_info (network, &hint, &res, &pgm_error)) { + if (pgm_error->domain == PGM_IF_ERROR && ( + pgm_error->code == PGM_IF_ERROR_INVAL || + pgm_error->code == PGM_IF_ERROR_XDEV || + pgm_error->code == PGM_IF_ERROR_NODEV || + pgm_error->code == PGM_IF_ERROR_NOTUNIQ || + pgm_error->code == PGM_IF_ERROR_ADDRFAMILY || + pgm_error->code == PGM_IF_ERROR_FAMILY || + pgm_error->code == PGM_IF_ERROR_NODATA || + pgm_error->code == PGM_IF_ERROR_NONAME || + pgm_error->code == PGM_IF_ERROR_SERVICE)) { + g_error_free (pgm_error); + errno = EINVAL; + return -1; + } + + zmq_assert (false); + } + + res->ti_gsi = gsi; + res->ti_dport = port_number; + + // If we are using UDP encapsulation update gsr or res. + if (udp_encapsulation_) { + res->ti_udp_encap_ucast_port = port_number; + res->ti_udp_encap_mcast_port = port_number; + } + + if (!pgm_transport_create (&transport, res, &pgm_error)) { + if (pgm_error->domain == PGM_TRANSPORT_ERROR && ( + pgm_error->code == PGM_TRANSPORT_ERROR_INVAL || + pgm_error->code == PGM_TRANSPORT_ERROR_PERM || + pgm_error->code == PGM_TRANSPORT_ERROR_NODEV)) { + pgm_if_free_transport_info (res); + g_error_free (pgm_error); + errno = EINVAL; + return -1; + } + + zmq_assert (false); + } + + pgm_if_free_transport_info (res); + + // Common parameters for receiver and sender. + + // Set maximum transport protocol data unit size (TPDU). + rc = pgm_transport_set_max_tpdu (transport, pgm_max_tpdu); + if (rc != TRUE) { + errno = EINVAL; + return -1; + } + + // Set maximum number of network hops to cross. + rc = pgm_transport_set_hops (transport, 16); + if (rc != TRUE) { + errno = EINVAL; + return -1; + } + + // Set nonblocking send/recv sockets. + if (!pgm_transport_set_nonblocking (transport, true)) { + errno = EINVAL; + return -1; + } + + if (receiver) { + + // Receiver transport. + + // Note that NAKs are still generated by the transport. + rc = pgm_transport_set_recv_only (transport, true, false); + zmq_assert (rc == TRUE); + + if (options.rcvbuf) { + rc = pgm_transport_set_rcvbuf (transport, (int) options.rcvbuf); + if (rc != TRUE) + return -1; + } + + // Set NAK transmit back-off interval [us]. + rc = pgm_transport_set_nak_bo_ivl (transport, 50 * 1000); + zmq_assert (rc == TRUE); + + // Set timeout before repeating NAK [us]. + rc = pgm_transport_set_nak_rpt_ivl (transport, 200 * 1000); + zmq_assert (rc == TRUE); + + // Set timeout for receiving RDATA. + rc = pgm_transport_set_nak_rdata_ivl (transport, 200 * 1000); + zmq_assert (rc == TRUE); + + // Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES). + rc = pgm_transport_set_nak_data_retries (transport, 5); + zmq_assert (rc == TRUE); + + // Set retries for NCF after NAK (NAK_NCF_RETRIES). + rc = pgm_transport_set_nak_ncf_retries (transport, 2); + zmq_assert (rc == TRUE); + + // Set timeout for removing a dead peer [us]. + rc = pgm_transport_set_peer_expiry (transport, 5 * 8192 * 1000); + zmq_assert (rc == TRUE); + + // Set expiration time of SPM Requests [us]. + rc = pgm_transport_set_spmr_expiry (transport, 25 * 1000); + zmq_assert (rc == TRUE); + + // Set the size of the receive window. + // Data rate is in [B/s]. options.rate is in [kb/s]. + if (options.rate <= 0) { + errno = EINVAL; + return -1; + } + rc = pgm_transport_set_rxw_max_rte (transport, + options.rate * 1000 / 8); + if (rc != TRUE) { + errno = EINVAL; + return -1; + } + + // Recovery interval [s]. + if (options.recovery_ivl <= 0) { + errno = EINVAL; + return -1; + } + rc = pgm_transport_set_rxw_secs (transport, options.recovery_ivl); + if (rc != TRUE) { + errno = EINVAL; + return -1; + } + + } else { + + // Sender transport. + + // Waiting pipe won't be read. + rc = pgm_transport_set_send_only (transport, TRUE); + zmq_assert (rc == TRUE); + + if (options.sndbuf) { + rc = pgm_transport_set_sndbuf (transport, (int) options.sndbuf); + if (rc != TRUE) + return -1; + } + + // Set the size of the send window. + // Data rate is in [B/s] options.rate is in [kb/s]. + if (options.rate <= 0) { + errno = EINVAL; + return -1; + } + rc = pgm_transport_set_txw_max_rte (transport, + options.rate * 1000 / 8); + if (rc != TRUE) { + errno = EINVAL; + return -1; + } + + // Recovery interval [s]. + if (options.recovery_ivl <= 0) { + errno = EINVAL; + return -1; + } + rc = pgm_transport_set_txw_secs (transport, options.recovery_ivl); + if (rc != TRUE) { + errno = EINVAL; + return -1; + } + + // Set interval of background SPM packets [us]. + rc = pgm_transport_set_ambient_spm (transport, 8192 * 1000); + zmq_assert (rc == TRUE); + + // Set intervals of data flushing SPM packets [us]. + guint spm_heartbeat[] = {4 * 1000, 4 * 1000, 8 * 1000, 16 * 1000, + 32 * 1000, 64 * 1000, 128 * 1000, 256 * 1000, 512 * 1000, + 1024 * 1000, 2048 * 1000, 4096 * 1000, 8192 * 1000}; + rc = pgm_transport_set_heartbeat_spm (transport, spm_heartbeat, + G_N_ELEMENTS(spm_heartbeat)); + zmq_assert (rc == TRUE); + } + + // Enable multicast loopback. + if (options.use_multicast_loop) { + rc = pgm_transport_set_multicast_loop (transport, true); + zmq_assert (rc == TRUE); + } + + // Bind a transport to the specified network devices. + if (!pgm_transport_bind (transport, &pgm_error)) { + if (pgm_error->domain == PGM_IF_ERROR && ( + pgm_error->code == PGM_IF_ERROR_INVAL || + pgm_error->code == PGM_IF_ERROR_XDEV || + pgm_error->code == PGM_IF_ERROR_NODEV || + pgm_error->code == PGM_IF_ERROR_NOTUNIQ || + pgm_error->code == PGM_IF_ERROR_ADDRFAMILY || + pgm_error->code == PGM_IF_ERROR_FAMILY || + pgm_error->code == PGM_IF_ERROR_NODATA || + pgm_error->code == PGM_IF_ERROR_NONAME || + pgm_error->code == PGM_IF_ERROR_SERVICE)) { + g_error_free (pgm_error); + errno = EINVAL; + return -1; + } + if (pgm_error->domain == PGM_TRANSPORT_ERROR && ( + pgm_error->code == PGM_TRANSPORT_ERROR_FAILED)) { + g_error_free (pgm_error); + errno = EINVAL; + return -1; + } + + zmq_assert (false); + } + + // For receiver transport preallocate pgm_msgv array. + // TODO: ? + if (receiver) { + zmq_assert (in_batch_size > 0); + size_t max_tsdu_size = get_max_tsdu_size (); + pgm_msgv_len = (int) in_batch_size / max_tsdu_size; + if ((int) in_batch_size % max_tsdu_size) + pgm_msgv_len++; + zmq_assert (pgm_msgv_len); + + pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len); + } + + return 0; +} + +zmq::pgm_socket_t::~pgm_socket_t () +{ + if (pgm_msgv) + free (pgm_msgv); + if (transport) + pgm_transport_destroy (transport, TRUE); +} + +// Get receiver fds. recv_fd is from transport->recv_sock +// waiting_pipe_fd is from transport->waiting_pipe [0] +void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, + int *waiting_pipe_fd_) +{ + zmq_assert (receive_fd_); + zmq_assert (waiting_pipe_fd_); + + // recv_sock2 should not be used - check it. + zmq_assert (transport->recv_sock2 == -1); + + // Check if transport can receive data and can not send. + zmq_assert (transport->can_recv_data); + zmq_assert (!transport->can_send_data); + + // Take FDs directly from transport. + *receive_fd_ = pgm_transport_get_recv_fd (transport); + *waiting_pipe_fd_ = pgm_transport_get_pending_fd (transport); +} + +// Get fds and store them into user allocated memory. +// sender_fd is from pgm_transport->send_sock. +// receive_fd_ is from transport->recv_sock. +// rdata_notify_fd_ is from transport->rdata_notify. +// pending_notify_fd_ is from transport->pending_notify. +void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, + int *rdata_notify_fd_, int *pending_notify_fd_) +{ + zmq_assert (send_fd_); + zmq_assert (receive_fd_); + + zmq_assert (rdata_notify_fd_); + zmq_assert (pending_notify_fd_); + + // recv_sock2 should not be used - check it. + zmq_assert (transport->recv_sock2 == -1); + + // Check if transport can send data and can not receive. + zmq_assert (transport->can_send_data); + zmq_assert (!transport->can_recv_data); + + // Take FDs from transport. + *send_fd_ = pgm_transport_get_send_fd (transport); + *receive_fd_ = pgm_transport_get_recv_fd (transport); + + *rdata_notify_fd_ = pgm_transport_get_repair_fd (transport); + *pending_notify_fd_ = pgm_transport_get_pending_fd (transport); +} + +// Send one APDU, transmit window owned memory. +size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) +{ + size_t nbytes = 0; + + PGMIOStatus status = pgm_send (transport, data_, data_len_, &nbytes); + + if (nbytes != data_len_) { + zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED); + zmq_assert (nbytes == 0); + } + + // We have to write all data as one packet. + if (nbytes > 0) + zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_); + + return nbytes; +} + +// Return max TSDU size without fragmentation from current PGM transport. +size_t zmq::pgm_socket_t::get_max_tsdu_size () +{ + return (size_t) pgm_transport_max_tsdu (transport, false); +} + +// pgm_transport_recvmsgv is called to fill the pgm_msgv array up to +// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are +// returned. +ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) +{ + size_t raw_data_len = 0; + + // We just sent all data from pgm_transport_recvmsgv up + // and have to return 0 that another engine in this thread is scheduled. + if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { + + // Reset all the counters. + nbytes_rec = 0; + nbytes_processed = 0; + pgm_msgv_processed = 0; + return 0; + } + + // If we have are going first time or if we have processed all pgm_msgv_t + // structure previously read from the pgm socket. + if (nbytes_rec == nbytes_processed) { + + // Check program flow. + zmq_assert (pgm_msgv_processed == 0); + zmq_assert (nbytes_processed == 0); + zmq_assert (nbytes_rec == 0); + + // Receive a vector of Application Protocol Domain Unit's (APDUs) + // from the transport. + GError *pgm_error = NULL; + + const PGMIOStatus status = pgm_recvmsgv (transport, pgm_msgv, + pgm_msgv_len, MSG_DONTWAIT, &nbytes_rec, &pgm_error); + + zmq_assert (status != PGM_IO_STATUS_ERROR); + + // In a case when no ODATA/RDATA fired POLLIN event (SPM...) + // pgm_recvmsg returns ?. + if (status == PGM_IO_STATUS_TIMER_PENDING) { + + zmq_assert (nbytes_rec == 0); + + // In case if no RDATA/ODATA caused POLLIN 0 is + // returned. + nbytes_rec = 0; + return 0; + } + + // Data loss. + if (status == PGM_IO_STATUS_RESET) { + + pgm_peer_t* peer = (pgm_peer_t*) transport->peers_pending->data; + + // Save lost data TSI. + *tsi_ = &peer->tsi; + nbytes_rec = 0; + + // In case of dala loss -1 is returned. + errno = EINVAL; + g_error_free (pgm_error); + return -1; + } + + zmq_assert (status == PGM_IO_STATUS_NORMAL); + } + else + { + zmq_assert (pgm_msgv_processed <= pgm_msgv_len); + } + + zmq_assert (nbytes_rec > 0); + + // Only one APDU per pgm_msgv_t structure is allowed. + zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1); + + struct pgm_sk_buff_t* skb = + pgm_msgv [pgm_msgv_processed].msgv_skb [0]; + + // Take pointers from pgm_msgv_t structure. + *raw_data_ = skb->data; + raw_data_len = skb->len; + + // Save current TSI. + *tsi_ = &skb->tsi; + + // Move the the next pgm_msgv_t structure. + pgm_msgv_processed++; + zmq_assert (pgm_msgv_processed <= pgm_msgv_len); + nbytes_processed +=raw_data_len; + + return raw_data_len; +} + +void zmq::pgm_socket_t::process_upstream () +{ + pgm_msgv_t dummy_msg; + + size_t dummy_bytes = 0; + GError *pgm_error = NULL; + + PGMIOStatus status = pgm_recvmsgv (transport, &dummy_msg, + 1, MSG_DONTWAIT, &dummy_bytes, &pgm_error); + + zmq_assert (status != PGM_IO_STATUS_ERROR); + + // No data should be returned. + zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || + status == PGM_IO_STATUS_RATE_LIMITED)); +} + +#endif + |