diff options
author | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:35 +0100 |
---|---|---|
committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:35 +0100 |
commit | e645fc2693acc796304498909786b7b47005b429 (patch) | |
tree | 4118cd4c7b9eba3ba1d6892800c79669ea94c4e9 /src/pgm_socket.cpp | |
parent | 2c416a793ea781273a5da6742211f5f01af13a2b (diff) |
Imported Upstream version 2.1.3upstream/2.1.3
Diffstat (limited to 'src/pgm_socket.cpp')
-rw-r--r-- | src/pgm_socket.cpp | 719 |
1 files changed, 443 insertions, 276 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 5a952a7..5a82907 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -1,19 +1,20 @@ /* - Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ @@ -40,8 +41,12 @@ #include "uuid.hpp" #include "stdint.hpp" +#ifndef MSG_ERRQUEUE +#define MSG_ERRQUEUE 0x2000 +#endif + zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : - transport (NULL), + sock (NULL), options (options_), receiver (receiver_), pgm_msgv (NULL), @@ -52,13 +57,18 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : { } +// Create, bind and connect PGM socket. +// network_ of the form <interface & multicast group decls>:<IP port> +// e.g. eth0;239.192.0.1:7500 +// link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000 +// ;[fe80::1%en0]:7500 int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) { // Can not open transport before destroying old one. - zmq_assert (transport == NULL); + zmq_assert (sock == NULL); - // Parse port number. - const char *port_delim = strchr (network_, ':'); + // Parse port number, start from end for IPv6 + const char *port_delim = strrchr (network_, ':'); if (!port_delim) { errno = EINVAL; return -1; @@ -73,261 +83,276 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) } memset (network, '\0', sizeof (network)); memcpy (network, network_, port_delim - network_); - + + // Validate socket options + // Data rate is in [B/s]. options.rate is in [kb/s]. + if (options.rate <= 0) { + errno = EINVAL; + return -1; + } + // Recovery interval [s] or [ms] - based on the user's call + if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) { + errno = EINVAL; + return -1; + } + // Zero counter used in msgrecv. nbytes_rec = 0; nbytes_processed = 0; pgm_msgv_processed = 0; - int rc; - GError *pgm_error = NULL; + pgm_error_t *pgm_error = NULL; + struct pgm_addrinfo_t hints, *res = NULL; + sa_family_t sa_family; - // PGM transport GSI. - pgm_gsi_t gsi; - - std::string gsi_base; - - if (options.identity.size () > 0) { - - // Create gsi from identity. - // TODO: We assume that identity is standard C string here. - // What if it contains binary zeroes? - gsi_base.assign ((const char*) options.identity.data (), - options.identity.size ()); - } else { + memset (&hints, 0, sizeof (hints)); + hints.ai_family = AF_UNSPEC; + if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) { - // Generate random gsi. - gsi_base = uuid_t ().to_string (); - } + // Invalid parameters don't set pgm_error_t. + zmq_assert (pgm_error != NULL); + if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && ( - rc = pgm_gsi_create_from_string (&gsi, gsi_base.c_str (), -1); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } + // NB: cannot catch EAI_BADFLAGS. + pgm_error->code != PGM_ERROR_SERVICE && + pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) - struct pgm_transport_info_t *res = NULL; - struct pgm_transport_info_t hint; - memset (&hint, 0, sizeof (hint)); - hint.ti_family = AF_INET; - - if (!pgm_if_get_transport_info (network, &hint, &res, &pgm_error)) { - if (pgm_error->domain == PGM_IF_ERROR && ( - pgm_error->code == PGM_IF_ERROR_INVAL || - pgm_error->code == PGM_IF_ERROR_XDEV || - pgm_error->code == PGM_IF_ERROR_NODEV || - pgm_error->code == PGM_IF_ERROR_NOTUNIQ || - pgm_error->code == PGM_IF_ERROR_ADDRFAMILY || - pgm_error->code == PGM_IF_ERROR_FAMILY || - pgm_error->code == PGM_IF_ERROR_NODATA || - pgm_error->code == PGM_IF_ERROR_NONAME || - pgm_error->code == PGM_IF_ERROR_SERVICE)) { - g_error_free (pgm_error); - errno = EINVAL; - return -1; - } + // User, host, or network configuration or transient error. + goto err_abort; + // Fatal OpenPGM internal error. zmq_assert (false); } - res->ti_gsi = gsi; - res->ti_dport = port_number; + zmq_assert (res != NULL); - // If we are using UDP encapsulation update gsr or res. - if (udp_encapsulation_) { - res->ti_udp_encap_ucast_port = port_number; - res->ti_udp_encap_mcast_port = port_number; - } + // Pick up detected IP family. + sa_family = res->ai_send_addrs[0].gsr_group.ss_family; - if (!pgm_transport_create (&transport, res, &pgm_error)) { - if (pgm_error->domain == PGM_TRANSPORT_ERROR && ( - pgm_error->code == PGM_TRANSPORT_ERROR_INVAL || - pgm_error->code == PGM_TRANSPORT_ERROR_PERM || - pgm_error->code == PGM_TRANSPORT_ERROR_NODEV)) { - pgm_if_free_transport_info (res); - g_error_free (pgm_error); - errno = EINVAL; - return -1; + // Create IP/PGM or UDP/PGM socket. + if (udp_encapsulation_) { + if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, + &pgm_error)) { + + // Invalid parameters don't set pgm_error_t. + zmq_assert (pgm_error != NULL); + if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( + pgm_error->code != PGM_ERROR_BADF && + pgm_error->code != PGM_ERROR_FAULT && + pgm_error->code != PGM_ERROR_NOPROTOOPT && + pgm_error->code != PGM_ERROR_FAILED)) + + // User, host, or network configuration or transient error. + goto err_abort; + + // Fatal OpenPGM internal error. + zmq_assert (false); } - zmq_assert (false); + // All options are of data type int + const int encapsulation_port = port_number; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, + &encapsulation_port, sizeof (encapsulation_port))) + goto err_abort; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, + &encapsulation_port, sizeof (encapsulation_port))) + goto err_abort; } - - pgm_if_free_transport_info (res); - - // Common parameters for receiver and sender. - - // Set maximum transport protocol data unit size (TPDU). - rc = pgm_transport_set_max_tpdu (transport, pgm_max_tpdu); - if (rc != TRUE) { - errno = EINVAL; - return -1; + else { + if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, + &pgm_error)) { + + // Invalid parameters don't set pgm_error_t. + zmq_assert (pgm_error != NULL); + if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( + pgm_error->code != PGM_ERROR_BADF && + pgm_error->code != PGM_ERROR_FAULT && + pgm_error->code != PGM_ERROR_NOPROTOOPT && + pgm_error->code != PGM_ERROR_FAILED)) + + // User, host, or network configuration or transient error. + goto err_abort; + + // Fatal OpenPGM internal error. + zmq_assert (false); + } } - // Set maximum number of network hops to cross. - rc = pgm_transport_set_hops (transport, 16); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } + { + const int rcvbuf = (int) options.rcvbuf, + sndbuf = (int) options.sndbuf, + max_tpdu = (int) pgm_max_tpdu; + if (rcvbuf) { + if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf, + sizeof (rcvbuf))) + goto err_abort; + } + if (sndbuf) { + if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, + sizeof (sndbuf))) + goto err_abort; + } - // Set nonblocking send/recv sockets. - if (!pgm_transport_set_nonblocking (transport, true)) { - errno = EINVAL; - return -1; + // Set maximum transport protocol data unit size (TPDU). + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu, + sizeof (max_tpdu))) + goto err_abort; } if (receiver) { + const int recv_only = 1, + rxw_max_tpdu = (int) pgm_max_tpdu, + rxw_sqns = compute_sqns (rxw_max_tpdu), + peer_expiry = pgm_secs (300), + spmr_expiry = pgm_msecs (25), + nak_bo_ivl = pgm_msecs (50), + nak_rpt_ivl = pgm_msecs (200), + nak_rdata_ivl = pgm_msecs (200), + nak_data_retries = 50, + nak_ncf_retries = 50; + + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, + sizeof (recv_only)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns, + sizeof (rxw_sqns)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, + sizeof (peer_expiry)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, + sizeof (spmr_expiry)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, + sizeof (nak_bo_ivl)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, + sizeof (nak_rpt_ivl)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL, + &nak_rdata_ivl, sizeof (nak_rdata_ivl)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, + &nak_data_retries, sizeof (nak_data_retries)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, + &nak_ncf_retries, sizeof (nak_ncf_retries))) + goto err_abort; + } else { + const int send_only = 1, + max_rte = (int) ((options.rate * 1000) / 8), + txw_max_tpdu = (int) pgm_max_tpdu, + txw_sqns = compute_sqns (txw_max_tpdu), + ambient_spm = pgm_secs (30), + heartbeat_spm[] = { pgm_msecs (100), + pgm_msecs (100), + pgm_msecs (100), + pgm_msecs (100), + pgm_msecs (1300), + pgm_secs (7), + pgm_secs (16), + pgm_secs (25), + pgm_secs (30) }; + + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, + &send_only, sizeof (send_only)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE, + &max_rte, sizeof (max_rte)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS, + &txw_sqns, sizeof (txw_sqns)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM, + &ambient_spm, sizeof (ambient_spm)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM, + &heartbeat_spm, sizeof (heartbeat_spm))) + goto err_abort; + } - // Receiver transport. - - // Note that NAKs are still generated by the transport. - rc = pgm_transport_set_recv_only (transport, true, false); - zmq_assert (rc == TRUE); - - if (options.rcvbuf) { - rc = pgm_transport_set_rcvbuf (transport, (int) options.rcvbuf); - if (rc != TRUE) - return -1; - } - - // Set NAK transmit back-off interval [us]. - rc = pgm_transport_set_nak_bo_ivl (transport, 50 * 1000); - zmq_assert (rc == TRUE); - - // Set timeout before repeating NAK [us]. - rc = pgm_transport_set_nak_rpt_ivl (transport, 200 * 1000); - zmq_assert (rc == TRUE); - - // Set timeout for receiving RDATA. - rc = pgm_transport_set_nak_rdata_ivl (transport, 200 * 1000); - zmq_assert (rc == TRUE); - - // Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES). - rc = pgm_transport_set_nak_data_retries (transport, 5); - zmq_assert (rc == TRUE); - - // Set retries for NCF after NAK (NAK_NCF_RETRIES). - rc = pgm_transport_set_nak_ncf_retries (transport, 2); - zmq_assert (rc == TRUE); - - // Set timeout for removing a dead peer [us]. - rc = pgm_transport_set_peer_expiry (transport, 5 * 8192 * 1000); - zmq_assert (rc == TRUE); - - // Set expiration time of SPM Requests [us]. - rc = pgm_transport_set_spmr_expiry (transport, 25 * 1000); - zmq_assert (rc == TRUE); + // PGM transport GSI. + struct pgm_sockaddr_t addr; - // Set the size of the receive window. - // Data rate is in [B/s]. options.rate is in [kb/s]. - if (options.rate <= 0) { - errno = EINVAL; - return -1; - } - rc = pgm_transport_set_rxw_max_rte (transport, - options.rate * 1000 / 8); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } + memset (&addr, 0, sizeof(addr)); + addr.sa_port = port_number; + addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT; - // Recovery interval [s]. - if (options.recovery_ivl <= 0) { - errno = EINVAL; - return -1; - } - rc = pgm_transport_set_rxw_secs (transport, options.recovery_ivl); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } + if (options.identity.size () > 0) { + // Create gsi from identity. + if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, + options.identity.data (), options.identity.size ())) + goto err_abort; } else { - // Sender transport. + // Generate random gsi. + std::string gsi_base = uuid_t ().to_string (); + if (!pgm_gsi_create_from_string (&addr.sa_addr.gsi, + gsi_base.c_str (), -1)) + goto err_abort; + } - // Waiting pipe won't be read. - rc = pgm_transport_set_send_only (transport, TRUE); - zmq_assert (rc == TRUE); + // Bind a transport to the specified network devices. + struct pgm_interface_req_t if_req; + memset (&if_req, 0, sizeof(if_req)); + if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface; + if_req.ir_scope_id = 0; + if (AF_INET6 == sa_family) { + struct sockaddr_in6 sa6; + memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6)); + if_req.ir_scope_id = sa6.sin6_scope_id; + } + if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req), + &if_req, sizeof (if_req), &pgm_error)) { - if (options.sndbuf) { - rc = pgm_transport_set_sndbuf (transport, (int) options.sndbuf); - if (rc != TRUE) - return -1; - } + // Invalid parameters don't set pgm_error_t. + zmq_assert (pgm_error != NULL); + if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET || + pgm_error->domain == PGM_ERROR_DOMAIN_IF) && ( + pgm_error->code != PGM_ERROR_INVAL && + pgm_error->code != PGM_ERROR_BADF && + pgm_error->code != PGM_ERROR_FAULT)) - // Set the size of the send window. - // Data rate is in [B/s] options.rate is in [kb/s]. - if (options.rate <= 0) { - errno = EINVAL; - return -1; - } - rc = pgm_transport_set_txw_max_rte (transport, - options.rate * 1000 / 8); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } + // User, host, or network configuration or transient error. + goto err_abort; - // Recovery interval [s]. - if (options.recovery_ivl <= 0) { - errno = EINVAL; - return -1; - } - rc = pgm_transport_set_txw_secs (transport, options.recovery_ivl); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } + // Fatal OpenPGM internal error. + zmq_assert (false); + } - // Set interval of background SPM packets [us]. - rc = pgm_transport_set_ambient_spm (transport, 8192 * 1000); - zmq_assert (rc == TRUE); - - // Set intervals of data flushing SPM packets [us]. - guint spm_heartbeat[] = {4 * 1000, 4 * 1000, 8 * 1000, 16 * 1000, - 32 * 1000, 64 * 1000, 128 * 1000, 256 * 1000, 512 * 1000, - 1024 * 1000, 2048 * 1000, 4096 * 1000, 8192 * 1000}; - rc = pgm_transport_set_heartbeat_spm (transport, spm_heartbeat, - G_N_ELEMENTS(spm_heartbeat)); - zmq_assert (rc == TRUE); + // Join IP multicast groups. + for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) { + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP, + &res->ai_recv_addrs [i], sizeof (struct group_req))) + goto err_abort; } - - // Enable multicast loopback. - if (options.use_multicast_loop) { - rc = pgm_transport_set_multicast_loop (transport, true); - zmq_assert (rc == TRUE); + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP, + &res->ai_send_addrs [0], sizeof (struct group_req))) + goto err_abort; + + pgm_freeaddrinfo (res); + res = NULL; + + // Set IP level parameters. + { + const int nonblocking = 1, + multicast_loop = options.use_multicast_loop ? 1 : 0, + multicast_hops = 16, + + // Expedited Forwarding PHB for network elements, no ECN. + dscp = 0x2e << 2; + + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, + &multicast_loop, sizeof (multicast_loop)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, + &multicast_hops, sizeof (multicast_hops))) + goto err_abort; + if (AF_INET6 != sa_family && !pgm_setsockopt (sock, + IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp))) + goto err_abort; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, + &nonblocking, sizeof (nonblocking))) + goto err_abort; } - // Bind a transport to the specified network devices. - if (!pgm_transport_bind (transport, &pgm_error)) { - if (pgm_error->domain == PGM_IF_ERROR && ( - pgm_error->code == PGM_IF_ERROR_INVAL || - pgm_error->code == PGM_IF_ERROR_XDEV || - pgm_error->code == PGM_IF_ERROR_NODEV || - pgm_error->code == PGM_IF_ERROR_NOTUNIQ || - pgm_error->code == PGM_IF_ERROR_ADDRFAMILY || - pgm_error->code == PGM_IF_ERROR_FAMILY || - pgm_error->code == PGM_IF_ERROR_NODATA || - pgm_error->code == PGM_IF_ERROR_NONAME || - pgm_error->code == PGM_IF_ERROR_SERVICE)) { - g_error_free (pgm_error); - errno = EINVAL; - return -1; - } - if (pgm_error->domain == PGM_TRANSPORT_ERROR && ( - pgm_error->code == PGM_TRANSPORT_ERROR_FAILED)) { - g_error_free (pgm_error); - errno = EINVAL; - return -1; - } + // Connect PGM transport to start state machine. + if (!pgm_connect (sock, &pgm_error)) { - zmq_assert (false); + // Invalid parameters don't set pgm_error_t. + zmq_assert (pgm_error != NULL); + goto err_abort; } // For receiver transport preallocate pgm_msgv array. - // TODO: ? if (receiver) { zmq_assert (in_batch_size > 0); size_t max_tsdu_size = get_max_tsdu_size (); @@ -337,96 +362,176 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) zmq_assert (pgm_msgv_len); pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len); + alloc_assert (pgm_msgv); } return 0; + +err_abort: + if (sock != NULL) { + pgm_close (sock, FALSE); + sock = NULL; + } + if (res != NULL) { + pgm_freeaddrinfo (res); + res = NULL; + } + if (pgm_error != NULL) { + pgm_error_free (pgm_error); + pgm_error = NULL; + } + errno = EINVAL; + return -1; } zmq::pgm_socket_t::~pgm_socket_t () { if (pgm_msgv) free (pgm_msgv); - if (transport) - pgm_transport_destroy (transport, TRUE); + if (sock) + pgm_close (sock, TRUE); } -// Get receiver fds. recv_fd is from transport->recv_sock -// waiting_pipe_fd is from transport->waiting_pipe [0] +// Get receiver fds. receive_fd_ is signaled for incoming packets, +// waiting_pipe_fd_ is signaled for state driven events and data. void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, int *waiting_pipe_fd_) { + socklen_t socklen; + bool rc; + zmq_assert (receive_fd_); zmq_assert (waiting_pipe_fd_); - // recv_sock2 should not be used - check it. - zmq_assert (transport->recv_sock2 == -1); - - // Check if transport can receive data and can not send. - zmq_assert (transport->can_recv_data); - zmq_assert (!transport->can_send_data); - - // Take FDs directly from transport. - *receive_fd_ = pgm_transport_get_recv_fd (transport); - *waiting_pipe_fd_ = pgm_transport_get_pending_fd (transport); + socklen = sizeof (*receive_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, + &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*receive_fd_)); + + socklen = sizeof (*waiting_pipe_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_, + &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*waiting_pipe_fd_)); } // Get fds and store them into user allocated memory. -// sender_fd is from pgm_transport->send_sock. -// receive_fd_ is from transport->recv_sock. -// rdata_notify_fd_ is from transport->rdata_notify. -// pending_notify_fd_ is from transport->pending_notify. +// send_fd is for non-blocking send wire notifications. +// receive_fd_ is for incoming back-channel protocol packets. +// rdata_notify_fd_ is raised for waiting repair transmissions. +// pending_notify_fd_ is for state driven events. void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, int *rdata_notify_fd_, int *pending_notify_fd_) { + socklen_t socklen; + bool rc; + zmq_assert (send_fd_); zmq_assert (receive_fd_); - zmq_assert (rdata_notify_fd_); zmq_assert (pending_notify_fd_); - // recv_sock2 should not be used - check it. - zmq_assert (transport->recv_sock2 == -1); - - // Check if transport can send data and can not receive. - zmq_assert (transport->can_send_data); - zmq_assert (!transport->can_recv_data); - - // Take FDs from transport. - *send_fd_ = pgm_transport_get_send_fd (transport); - *receive_fd_ = pgm_transport_get_recv_fd (transport); - - *rdata_notify_fd_ = pgm_transport_get_repair_fd (transport); - *pending_notify_fd_ = pgm_transport_get_pending_fd (transport); + socklen = sizeof (*send_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*receive_fd_)); + + socklen = sizeof (*receive_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, + &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*receive_fd_)); + + socklen = sizeof (*rdata_notify_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_, + &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*rdata_notify_fd_)); + + socklen = sizeof (*pending_notify_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, + pending_notify_fd_, &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*pending_notify_fd_)); } // Send one APDU, transmit window owned memory. +// data_len_ must be less than one TPDU. size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) { size_t nbytes = 0; - PGMIOStatus status = pgm_send (transport, data_, data_len_, &nbytes); + const int status = pgm_send (sock, data_, data_len_, &nbytes); - if (nbytes != data_len_) { - zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED); - zmq_assert (nbytes == 0); - } - - // We have to write all data as one packet. - if (nbytes > 0) + // We have to write all data as one packet. + if (nbytes > 0) { + zmq_assert (status == PGM_IO_STATUS_NORMAL); zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_); + } else { + zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || + status == PGM_IO_STATUS_WOULD_BLOCK); + + if (status == PGM_IO_STATUS_RATE_LIMITED) + errno = ENOMEM; + else + errno = EBUSY; + } + + // Save return value. + last_tx_status = status; return nbytes; } +long zmq::pgm_socket_t::get_rx_timeout () +{ + if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED && + last_rx_status != PGM_IO_STATUS_TIMER_PENDING) + return -1; + + struct timeval tv; + socklen_t optlen = sizeof (tv); + const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, + last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN : + PGM_TIME_REMAIN, &tv, &optlen); + zmq_assert (rc); + + const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + + return timeout; +} + +long zmq::pgm_socket_t::get_tx_timeout () +{ + if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED) + return -1; + + struct timeval tv; + socklen_t optlen = sizeof (tv); + const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, + &optlen); + zmq_assert (rc); + + const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + + return timeout; +} + // Return max TSDU size without fragmentation from current PGM transport. size_t zmq::pgm_socket_t::get_max_tsdu_size () { - return (size_t) pgm_transport_max_tsdu (transport, false); + int max_tsdu = 0; + socklen_t optlen = sizeof (max_tsdu); + + bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen); + zmq_assert (rc); + zmq_assert (optlen == sizeof (max_tsdu)); + return (size_t) max_tsdu; } -// pgm_transport_recvmsgv is called to fill the pgm_msgv array up to -// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are -// returned. +// pgm_recvmsgv is called to fill the pgm_msgv array up to pgm_msgv_len. +// In subsequent calls data from pgm_msgv structure are returned. ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) { size_t raw_data_len = 0; @@ -439,6 +544,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) nbytes_rec = 0; nbytes_processed = 0; pgm_msgv_processed = 0; + errno = EAGAIN; return 0; } @@ -453,15 +559,18 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Receive a vector of Application Protocol Domain Unit's (APDUs) // from the transport. - GError *pgm_error = NULL; + pgm_error_t *pgm_error = NULL; - const PGMIOStatus status = pgm_recvmsgv (transport, pgm_msgv, - pgm_msgv_len, MSG_DONTWAIT, &nbytes_rec, &pgm_error); + const int status = pgm_recvmsgv (sock, pgm_msgv, + pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error); + // Invalid parameters. zmq_assert (status != PGM_IO_STATUS_ERROR); + last_rx_status = status; + // In a case when no ODATA/RDATA fired POLLIN event (SPM...) - // pgm_recvmsg returns ?. + // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING. if (status == PGM_IO_STATUS_TIMER_PENDING) { zmq_assert (nbytes_rec == 0); @@ -469,21 +578,44 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; + errno = EBUSY; + return 0; + } + + // Send SPMR, NAK, ACK is rate limited. + if (status == PGM_IO_STATUS_RATE_LIMITED) { + + zmq_assert (nbytes_rec == 0); + + // In case if no RDATA/ODATA caused POLLIN 0 is returned. + nbytes_rec = 0; + errno = ENOMEM; + return 0; + } + + // No peers and hence no incoming packets. + if (status == PGM_IO_STATUS_WOULD_BLOCK) { + + zmq_assert (nbytes_rec == 0); + + // In case if no RDATA/ODATA caused POLLIN 0 is returned. + nbytes_rec = 0; + errno = EAGAIN; return 0; } // Data loss. if (status == PGM_IO_STATUS_RESET) { - pgm_peer_t* peer = (pgm_peer_t*) transport->peers_pending->data; + struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0]; // Save lost data TSI. - *tsi_ = &peer->tsi; + *tsi_ = &skb->tsi; nbytes_rec = 0; // In case of dala loss -1 is returned. errno = EINVAL; - g_error_free (pgm_error); + pgm_free_skb (skb); return -1; } @@ -494,6 +626,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_assert (pgm_msgv_processed <= pgm_msgv_len); } + // Zero byte payloads are valid in PGM, but not 0MQ protocol. zmq_assert (nbytes_rec > 0); // Only one APDU per pgm_msgv_t structure is allowed. @@ -522,16 +655,50 @@ void zmq::pgm_socket_t::process_upstream () pgm_msgv_t dummy_msg; size_t dummy_bytes = 0; - GError *pgm_error = NULL; + pgm_error_t *pgm_error = NULL; - PGMIOStatus status = pgm_recvmsgv (transport, &dummy_msg, - 1, MSG_DONTWAIT, &dummy_bytes, &pgm_error); + const int status = pgm_recvmsgv (sock, &dummy_msg, + 1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error); + // Invalid parameters. zmq_assert (status != PGM_IO_STATUS_ERROR); // No data should be returned. zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || - status == PGM_IO_STATUS_RATE_LIMITED)); + status == PGM_IO_STATUS_RATE_LIMITED || + status == PGM_IO_STATUS_WOULD_BLOCK)); + + last_rx_status = status; + + if (status == PGM_IO_STATUS_TIMER_PENDING) + errno = EBUSY; + else if (status == PGM_IO_STATUS_RATE_LIMITED) + errno = ENOMEM; + else + errno = EAGAIN; +} + +int zmq::pgm_socket_t::compute_sqns (int tpdu_) +{ + // Convert rate into B/ms. + uint64_t rate = ((uint64_t) options.rate) / 8; + + // Get recovery interval in milliseconds. + uint64_t interval = options.recovery_ivl_msec >= 0 ? + options.recovery_ivl_msec : + options.recovery_ivl * 1000; + + // Compute the size of the buffer in bytes. + uint64_t size = interval * rate; + + // Translate the size into number of packets. + uint64_t sqns = size / tpdu_; + + // Buffer should be able to contain at least one packet. + if (sqns == 0) + sqns = 1; + + return (int) sqns; } #endif |