diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 96 | ||||
-rw-r--r-- | src/pgm_socket.cpp | 564 | ||||
-rw-r--r-- | src/pgm_socket.hpp | 2 | ||||
-rw-r--r-- | src/zmq.cpp | 26 |
4 files changed, 358 insertions, 330 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 89fc44a..5cd4f73 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -6,7 +6,21 @@ pkgconfig_DATA = libzmq.pc include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h if BUILD_PGM -pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ +noinst_LTLIBRARIES = libpgm.la + +nodist_libpgm_la_SOURCES = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/thread.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/mem.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/string.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/list.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/slist.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/queue.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/hashtable.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/messages.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/error.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/math.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet_parse.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet_test.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/sockaddr.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/getifaddrs.c \ @@ -16,28 +30,25 @@ pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/nametoindex.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/inet_network.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/md5.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/rand.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/gsi.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/tsi.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/signal.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/txwi.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/rxwi.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/transport.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/txw.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/rxw.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/skbuff.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/socket.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/source.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/receiver.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/recv.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/pgm.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/engine.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/timer.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/net.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/rate_control.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/async.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/checksum.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/reed_solomon.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_tables.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/wsastrerror.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/glib-compat.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/backtrace.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/log.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/sockaddr.c \ + ../foreign/openpgm/@pgm_basename@/openpgm/pgm/histogram.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/version.c ../foreign/openpgm/@pgm_basename@/openpgm/pgm/version.c: ../foreign/openpgm/@pgm_basename@/openpgm/pgm/version_generator.py @@ -45,9 +56,9 @@ pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_tables.c: ../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_generator.pl perl ../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_generator.pl > $@ -endif -nodist_libzmq_la_SOURCES = $(pgm_sources) +libpgm_la_LIBADD = @LTLIBOBJS@ +endif libzmq_la_SOURCES = \ array.hpp \ @@ -189,53 +200,44 @@ if BUILD_PGM if ON_MINGW libpgm_diff_flags = \ -D_WIN32_WINNT=0x0501 \ - -DCONFIG_16BIT_CHECKSUM \ - -DCONFIG_HAVE_IFR_NETMASK \ - -DCONFIG_BIND_INADDR_ANY \ - -DCONFIG_GALOIS_MUL_LUT \ - -DIF_NAMESIZE=256 \ - -DPGM_GNUC_INTERNAL=G_GNUC_INTERNAL \ + -DCONFIG_HAVE_ISO_VARARGS \ + -DCONFIG_HAVE_TSC \ -DCONFIG_HAVE_WSACMSGHDR \ - -DGETTEXT_PACKAGE='"pgm"' \ - -DG_LOG_DOMAIN='"Pgm"' + -DCONFIG_HAVE_DSO_VISIBILITY \ + -DCONFIG_BIND_INADDR_ANY else libpgm_diff_flags = \ - -D__need_IOV_MAX \ - -DCONFIG_16BIT_CHECKSUM \ + -DCONFIG_HAVE_GETPROTOBYNAME_R2 \ + -DCONFIG_HAVE_ISO_VARARGS \ + -DCONFIG_HAVE_ALLOCA_H \ + -DCONFIG_HAVE_PROC \ + -DCONFIG_HAVE_BACKTRACE \ -DCONFIG_HAVE_PSELECT \ - -DCONFIG_HAVE_POLL \ - -DCONFIG_HAVE_PPOLL \ - -DCONFIG_HAVE_EPOLL \ - -DCONFIG_HAVE_CLOCK_GETTIME \ - -DCONFIG_HAVE_CLOCK_NANOSLEEP \ - -DCONFIG_HAVE_NANOSLEEP \ - -DCONFIG_HAVE_USLEEP \ -DCONFIG_HAVE_RTC \ -DCONFIG_HAVE_TSC \ - -DCONFIG_HAVE_IFR_NETMASK \ + -DCONFIG_HAVE_HPET \ + -DCONFIG_HAVE_POLL \ + -DCONFIG_HAVE_EPOLL \ -DCONFIG_HAVE_GETIFADDRS \ - -DCONFIG_HAVE_GETHOSTBYNAME2 \ - -DCONFIG_HAVE_GETPROTOBYNAME_R \ - -DCONFIG_BIND_INADDR_ANY \ - -DCONFIG_GALOIS_MUL_LUT \ + -DCONFIG_HAVE_IFR_NETMASK \ -DCONFIG_HAVE_MCAST_JOIN \ -DCONFIG_HAVE_IP_MREQN \ -DCONFIG_HAVE_SPRINTF_GROUPING \ - -DCONFIG_HAVE_HPET \ - -DPGM_GNUC_INTERNAL=G_GNUC_INTERNAL \ - -DGETTEXT_PACKAGE='"pgm"' \ - -DG_LOG_DOMAIN='"Pgm"' + -DCONFIG_HAVE_VASPRINTF \ + -DCONFIG_HAVE_DSO_VISIBILITY \ + -DCONFIG_BIND_INADDR_ANY \ + -DCONFIG_HAVE_GETOPT endif -libzmq_la_CFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ @LIBZMQ_EXTRA_CXXFLAGS@ \ - -Wall \ - -pedantic \ +libpgm_la_CFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ @LIBZMQ_EXTRA_CXXFLAGS@ \ -std=gnu99 \ - -fno-strict-aliasing \ - --param max-inline-insns-single=600 \ + -D_XOPEN_SOURCE=600 \ + -D_BSD_SOURCE \ -D_REENTRANT \ - -D_GNU_SOURCE \ + -DCONFIG_16BIT_CHECKSUM \ + -DCONFIG_GALOIS_MUL_LUT \ + -DGETTEXT_PACKAGE='"pgm"' \ ${libpgm_diff_flags} libzmq_la_CXXFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ \ @@ -246,6 +248,10 @@ if BUILD_NO_PGM libzmq_la_CXXFLAGS = @LIBZMQ_EXTRA_CXXFLAGS@ endif +if BUILD_PGM +libzmq_la_LIBADD = libpgm.la +endif + dist-hook: -rm $(distdir)/platform.hpp diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 5a952a7..52983d9 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -41,7 +41,7 @@ #include "stdint.hpp" zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : - transport (NULL), + sock (NULL), options (options_), receiver (receiver_), pgm_msgv (NULL), @@ -55,10 +55,10 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) { // Can not open transport before destroying old one. - zmq_assert (transport == NULL); + zmq_assert (sock == NULL); - // Parse port number. - const char *port_delim = strchr (network_, ':'); + // Parse port number, start from end for IPv6 + const char *port_delim = strrchr (network_, ':'); if (!port_delim) { errno = EINVAL; return -1; @@ -73,261 +73,227 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) } memset (network, '\0', sizeof (network)); memcpy (network, network_, port_delim - network_); - - // Zero counter used in msgrecv. - nbytes_rec = 0; - nbytes_processed = 0; - pgm_msgv_processed = 0; - - int rc; - GError *pgm_error = NULL; - - // PGM transport GSI. - pgm_gsi_t gsi; - - std::string gsi_base; - - if (options.identity.size () > 0) { - - // Create gsi from identity. - // TODO: We assume that identity is standard C string here. - // What if it contains binary zeroes? - gsi_base.assign ((const char*) options.identity.data (), - options.identity.size ()); - } else { - - // Generate random gsi. - gsi_base = uuid_t ().to_string (); + + // Validate socket options + // Data rate is in [B/s]. options.rate is in [kb/s]. + if (options.rate <= 0) { + errno = EINVAL; + return -1; } - - rc = pgm_gsi_create_from_string (&gsi, gsi_base.c_str (), -1); - if (rc != TRUE) { + // Recovery interval [s]. + if (options.recovery_ivl <= 0) { errno = EINVAL; return -1; } - struct pgm_transport_info_t *res = NULL; - struct pgm_transport_info_t hint; - memset (&hint, 0, sizeof (hint)); - hint.ti_family = AF_INET; - - if (!pgm_if_get_transport_info (network, &hint, &res, &pgm_error)) { - if (pgm_error->domain == PGM_IF_ERROR && ( - pgm_error->code == PGM_IF_ERROR_INVAL || - pgm_error->code == PGM_IF_ERROR_XDEV || - pgm_error->code == PGM_IF_ERROR_NODEV || - pgm_error->code == PGM_IF_ERROR_NOTUNIQ || - pgm_error->code == PGM_IF_ERROR_ADDRFAMILY || - pgm_error->code == PGM_IF_ERROR_FAMILY || - pgm_error->code == PGM_IF_ERROR_NODATA || - pgm_error->code == PGM_IF_ERROR_NONAME || - pgm_error->code == PGM_IF_ERROR_SERVICE)) { - g_error_free (pgm_error); - errno = EINVAL; - return -1; - } + // Zero counter used in msgrecv. + nbytes_rec = 0; + nbytes_processed = 0; + pgm_msgv_processed = 0; + bool rc; + pgm_error_t *pgm_error = NULL; + struct pgm_addrinfo_t hints, *res = NULL; + sa_family_t sa_family; + + memset (&hints, 0, sizeof (hints)); + hints.ai_family = AF_UNSPEC; + if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) { + if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && ( + pgm_error->code == PGM_ERROR_INVAL || + pgm_error->code == PGM_ERROR_XDEV || + pgm_error->code == PGM_ERROR_NODEV || + pgm_error->code == PGM_ERROR_NOTUNIQ || + pgm_error->code == PGM_ERROR_ADDRFAMILY || + pgm_error->code == PGM_ERROR_AFNOSUPPORT || + pgm_error->code == PGM_ERROR_NODATA || + pgm_error->code == PGM_ERROR_NONAME || + pgm_error->code == PGM_ERROR_SERVICE)) + goto err_abort; + +/* fatal OpenPGM API error */ zmq_assert (false); } - res->ti_gsi = gsi; - res->ti_dport = port_number; + // Pick up detected IP family + sa_family = res->ai_send_addrs[0].gsr_group.ss_family; - // If we are using UDP encapsulation update gsr or res. + // Create IP/PGM or UDP/PGM socket if (udp_encapsulation_) { - res->ti_udp_encap_ucast_port = port_number; - res->ti_udp_encap_mcast_port = port_number; - } - - if (!pgm_transport_create (&transport, res, &pgm_error)) { - if (pgm_error->domain == PGM_TRANSPORT_ERROR && ( - pgm_error->code == PGM_TRANSPORT_ERROR_INVAL || - pgm_error->code == PGM_TRANSPORT_ERROR_PERM || - pgm_error->code == PGM_TRANSPORT_ERROR_NODEV)) { - pgm_if_free_transport_info (res); - g_error_free (pgm_error); - errno = EINVAL; - return -1; + if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_error)) { + if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( + pgm_error->code == PGM_ERROR_INVAL || + pgm_error->code == PGM_ERROR_NODEV)) + goto err_abort; + +/* fatal OpenPGM API error */ + zmq_assert (false); } - zmq_assert (false); + // All options are of data type int + const int encapsulation_port = port_number; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &encapsulation_port, sizeof (encapsulation_port)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &encapsulation_port, sizeof (encapsulation_port))) + goto err_abort; + } else { + if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_error)) { + if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( + pgm_error->code == PGM_ERROR_INVAL || + pgm_error->code == PGM_ERROR_PERM || + pgm_error->code == PGM_ERROR_NODEV)) + goto err_abort; + +/* fatal OpenPGM API error */ + zmq_assert (false); + } } - pgm_if_free_transport_info (res); - - // Common parameters for receiver and sender. + { + const int rcvbuf = (int) options.rcvbuf, + sndbuf = (int) options.sndbuf, + max_tpdu = (int) pgm_max_tpdu; + if (rcvbuf) { + if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof (rcvbuf))) + goto err_abort; + } + if (sndbuf) { + if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &sndbuf, sizeof (sndbuf))) + goto err_abort; + } // Set maximum transport protocol data unit size (TPDU). - rc = pgm_transport_set_max_tpdu (transport, pgm_max_tpdu); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } - - // Set maximum number of network hops to cross. - rc = pgm_transport_set_hops (transport, 16); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } - - // Set nonblocking send/recv sockets. - if (!pgm_transport_set_nonblocking (transport, true)) { - errno = EINVAL; - return -1; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu, sizeof (max_tpdu))) + goto err_abort; } if (receiver) { - - // Receiver transport. - - // Note that NAKs are still generated by the transport. - rc = pgm_transport_set_recv_only (transport, true, false); - zmq_assert (rc == TRUE); - - if (options.rcvbuf) { - rc = pgm_transport_set_rcvbuf (transport, (int) options.rcvbuf); - if (rc != TRUE) - return -1; - } - - // Set NAK transmit back-off interval [us]. - rc = pgm_transport_set_nak_bo_ivl (transport, 50 * 1000); - zmq_assert (rc == TRUE); - - // Set timeout before repeating NAK [us]. - rc = pgm_transport_set_nak_rpt_ivl (transport, 200 * 1000); - zmq_assert (rc == TRUE); - - // Set timeout for receiving RDATA. - rc = pgm_transport_set_nak_rdata_ivl (transport, 200 * 1000); - zmq_assert (rc == TRUE); - - // Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES). - rc = pgm_transport_set_nak_data_retries (transport, 5); - zmq_assert (rc == TRUE); - - // Set retries for NCF after NAK (NAK_NCF_RETRIES). - rc = pgm_transport_set_nak_ncf_retries (transport, 2); - zmq_assert (rc == TRUE); - - // Set timeout for removing a dead peer [us]. - rc = pgm_transport_set_peer_expiry (transport, 5 * 8192 * 1000); - zmq_assert (rc == TRUE); - - // Set expiration time of SPM Requests [us]. - rc = pgm_transport_set_spmr_expiry (transport, 25 * 1000); - zmq_assert (rc == TRUE); - - // Set the size of the receive window. - // Data rate is in [B/s]. options.rate is in [kb/s]. - if (options.rate <= 0) { - errno = EINVAL; - return -1; - } - rc = pgm_transport_set_rxw_max_rte (transport, - options.rate * 1000 / 8); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } - - // Recovery interval [s]. - if (options.recovery_ivl <= 0) { - errno = EINVAL; - return -1; - } - rc = pgm_transport_set_rxw_secs (transport, options.recovery_ivl); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } - + const int recv_only = 1, + rxw_max_rte = options.rate * 1000 / 8, + rxw_secs = options.recovery_ivl, + peer_expiry = 5 * pgm_msecs (8192), + spmr_expiry = pgm_msecs (25), + nak_bo_ivl = pgm_msecs (50), + nak_rpt_ivl = pgm_msecs (200), + nak_rdata_ivl = pgm_msecs (200), + nak_data_retries = 5, + nak_ncf_retries = 2; + + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, sizeof (recv_only)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_MAX_RTE, &rxw_max_rte, sizeof (rxw_max_rte)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SECS, &rxw_secs, sizeof (rxw_secs)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof (peer_expiry)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof (spmr_expiry)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof (nak_bo_ivl)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof (nak_rpt_ivl)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof (nak_rdata_ivl)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof (nak_data_retries)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof (nak_ncf_retries))) + goto err_abort; } else { + const int send_only = 1, + txw_max_rte = options.rate * 1000 / 8, + txw_secs = options.recovery_ivl, + ambient_spm = pgm_msecs (8192), + heartbeat_spm[] = { pgm_msecs (4), + pgm_msecs (4), + pgm_msecs (8), + pgm_msecs (16), + pgm_msecs (32), + pgm_msecs (64), + pgm_msecs (128), + pgm_msecs (256), + pgm_msecs (512), + pgm_msecs (1024), + pgm_msecs (2048), + pgm_msecs (4096), + pgm_msecs (8192) }; + + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, &send_only, sizeof (send_only)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_MAX_RTE, &txw_max_rte, sizeof (txw_max_rte)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SECS, &txw_secs, sizeof (txw_secs)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM, &ambient_spm, sizeof (ambient_spm)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM, &heartbeat_spm, sizeof (heartbeat_spm))) + goto err_abort; + } - // Sender transport. + // PGM transport GSI. + struct pgm_sockaddr_t addr; - // Waiting pipe won't be read. - rc = pgm_transport_set_send_only (transport, TRUE); - zmq_assert (rc == TRUE); + memset (&addr, 0, sizeof(addr)); + addr.sa_port = port_number; + addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT; - if (options.sndbuf) { - rc = pgm_transport_set_sndbuf (transport, (int) options.sndbuf); - if (rc != TRUE) - return -1; - } + if (options.identity.size () > 0) { - // Set the size of the send window. - // Data rate is in [B/s] options.rate is in [kb/s]. - if (options.rate <= 0) { - errno = EINVAL; - return -1; - } - rc = pgm_transport_set_txw_max_rte (transport, - options.rate * 1000 / 8); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } + // Create gsi from identity. + if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, options.identity.data (), options.identity.size ())) + goto err_abort; + } else { - // Recovery interval [s]. - if (options.recovery_ivl <= 0) { - errno = EINVAL; - return -1; - } - rc = pgm_transport_set_txw_secs (transport, options.recovery_ivl); - if (rc != TRUE) { - errno = EINVAL; - return -1; - } + // Generate random gsi. + std::string gsi_base = uuid_t ().to_string (); + if (!pgm_gsi_create_from_string (&addr.sa_addr.gsi, gsi_base.c_str (), -1)) + goto err_abort; + } - // Set interval of background SPM packets [us]. - rc = pgm_transport_set_ambient_spm (transport, 8192 * 1000); - zmq_assert (rc == TRUE); - - // Set intervals of data flushing SPM packets [us]. - guint spm_heartbeat[] = {4 * 1000, 4 * 1000, 8 * 1000, 16 * 1000, - 32 * 1000, 64 * 1000, 128 * 1000, 256 * 1000, 512 * 1000, - 1024 * 1000, 2048 * 1000, 4096 * 1000, 8192 * 1000}; - rc = pgm_transport_set_heartbeat_spm (transport, spm_heartbeat, - G_N_ELEMENTS(spm_heartbeat)); - zmq_assert (rc == TRUE); + // Bind a transport to the specified network devices. + struct pgm_interface_req_t if_req; + memset (&if_req, 0, sizeof(if_req)); + if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface; + if_req.ir_scope_id = 0; + if (AF_INET6 == sa_family) { + struct sockaddr_in6 sa6; + memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6)); + if_req.ir_scope_id = sa6.sin6_scope_id; } - - // Enable multicast loopback. - if (options.use_multicast_loop) { - rc = pgm_transport_set_multicast_loop (transport, true); - zmq_assert (rc == TRUE); + if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req), &if_req, sizeof (if_req), &pgm_error)) { + if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && ( + pgm_error->code == PGM_ERROR_INVAL || + pgm_error->code == PGM_ERROR_XDEV || + pgm_error->code == PGM_ERROR_NODEV || + pgm_error->code == PGM_ERROR_NOTUNIQ || + pgm_error->code == PGM_ERROR_ADDRFAMILY || + pgm_error->code == PGM_ERROR_AFNOSUPPORT || + pgm_error->code == PGM_ERROR_NODATA || + pgm_error->code == PGM_ERROR_NONAME || + pgm_error->code == PGM_ERROR_SERVICE)) + goto err_abort; + if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( + pgm_error->code == PGM_ERROR_FAILED)) + goto err_abort; + +/* fatal OpenPGM API error */ + zmq_assert (false); } - // Bind a transport to the specified network devices. - if (!pgm_transport_bind (transport, &pgm_error)) { - if (pgm_error->domain == PGM_IF_ERROR && ( - pgm_error->code == PGM_IF_ERROR_INVAL || - pgm_error->code == PGM_IF_ERROR_XDEV || - pgm_error->code == PGM_IF_ERROR_NODEV || - pgm_error->code == PGM_IF_ERROR_NOTUNIQ || - pgm_error->code == PGM_IF_ERROR_ADDRFAMILY || - pgm_error->code == PGM_IF_ERROR_FAMILY || - pgm_error->code == PGM_IF_ERROR_NODATA || - pgm_error->code == PGM_IF_ERROR_NONAME || - pgm_error->code == PGM_IF_ERROR_SERVICE)) { - g_error_free (pgm_error); - errno = EINVAL; - return -1; - } - if (pgm_error->domain == PGM_TRANSPORT_ERROR && ( - pgm_error->code == PGM_TRANSPORT_ERROR_FAILED)) { - g_error_free (pgm_error); - errno = EINVAL; - return -1; - } + // Join IP multicast groups + for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) + { + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP, &res->ai_recv_addrs[i], sizeof (struct group_req))) + goto err_abort; + } + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof (struct group_req))) + goto err_abort; + pgm_freeaddrinfo (res); - zmq_assert (false); + // Set IP level parameters + { + const int nonblocking = 1, + multicast_loop = options.use_multicast_loop ? 1 : 0, + multicast_hops = 16, + dscp = 0x2e << 2; /* Expedited Forwarding PHB for network elements, no ECN. */ + + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop, sizeof (multicast_loop)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof (multicast_hops))) + goto err_abort; + if (AF_INET6 != sa_family && + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp))); + goto err_abort; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof (nonblocking))) + goto err_abort; } // For receiver transport preallocate pgm_msgv array. - // TODO: ? if (receiver) { zmq_assert (in_batch_size > 0); size_t max_tsdu_size = get_max_tsdu_size (); @@ -340,74 +306,102 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) } return 0; + +err_abort: + if (sock != NULL) { + pgm_close (sock, FALSE); + sock = NULL; + } + if (res != NULL) { + pgm_freeaddrinfo (res); + res = NULL; + } + if (pgm_error != NULL) { + pgm_error_free (pgm_error); + pgm_error = NULL; + } + errno = EINVAL; + return -1; } zmq::pgm_socket_t::~pgm_socket_t () { if (pgm_msgv) free (pgm_msgv); - if (transport) - pgm_transport_destroy (transport, TRUE); + if (sock) + pgm_close (sock, TRUE); } -// Get receiver fds. recv_fd is from transport->recv_sock -// waiting_pipe_fd is from transport->waiting_pipe [0] +// Get receiver fds. receive_fd_ is signaled for incoming +// packets, waiting_pipe_fd_ is signaled for state driven +// events and data. void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, int *waiting_pipe_fd_) { + socklen_t socklen; + bool rc; + zmq_assert (receive_fd_); zmq_assert (waiting_pipe_fd_); - // recv_sock2 should not be used - check it. - zmq_assert (transport->recv_sock2 == -1); + socklen = sizeof (*receive_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*receive_fd_)); - // Check if transport can receive data and can not send. - zmq_assert (transport->can_recv_data); - zmq_assert (!transport->can_send_data); - - // Take FDs directly from transport. - *receive_fd_ = pgm_transport_get_recv_fd (transport); - *waiting_pipe_fd_ = pgm_transport_get_pending_fd (transport); + socklen = sizeof (*waiting_pipe_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_, &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*waiting_pipe_fd_)); } // Get fds and store them into user allocated memory. -// sender_fd is from pgm_transport->send_sock. -// receive_fd_ is from transport->recv_sock. -// rdata_notify_fd_ is from transport->rdata_notify. -// pending_notify_fd_ is from transport->pending_notify. +// send_fd is for non-blocking send wire notifications. +// receive_fd_ is for incoming back-channel protocol packets. +// rdata_notify_fd_ is raised for waiting repair transmissions. +// pending_notify_fd_ is for state driven events. void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, int *rdata_notify_fd_, int *pending_notify_fd_) { + socklen_t socklen; + bool rc; + zmq_assert (send_fd_); zmq_assert (receive_fd_); - zmq_assert (rdata_notify_fd_); zmq_assert (pending_notify_fd_); - // recv_sock2 should not be used - check it. - zmq_assert (transport->recv_sock2 == -1); - - // Check if transport can send data and can not receive. - zmq_assert (transport->can_send_data); - zmq_assert (!transport->can_recv_data); - - // Take FDs from transport. - *send_fd_ = pgm_transport_get_send_fd (transport); - *receive_fd_ = pgm_transport_get_recv_fd (transport); - - *rdata_notify_fd_ = pgm_transport_get_repair_fd (transport); - *pending_notify_fd_ = pgm_transport_get_pending_fd (transport); + socklen = sizeof (*send_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*receive_fd_)); + + socklen = sizeof (*receive_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*receive_fd_)); + + socklen = sizeof (*rdata_notify_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_, &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*rdata_notify_fd_)); + + socklen = sizeof (*pending_notify_fd_); + rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, pending_notify_fd_, &socklen); + zmq_assert (rc); + zmq_assert (socklen == sizeof (*pending_notify_fd_)); } // Send one APDU, transmit window owned memory. +// data_len_ must be less than one TPDU. size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) { size_t nbytes = 0; - PGMIOStatus status = pgm_send (transport, data_, data_len_, &nbytes); + const int status = pgm_send (sock, data_, data_len_, &nbytes); if (nbytes != data_len_) { - zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED); + zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK); zmq_assert (nbytes == 0); } @@ -421,10 +415,16 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) // Return max TSDU size without fragmentation from current PGM transport. size_t zmq::pgm_socket_t::get_max_tsdu_size () { - return (size_t) pgm_transport_max_tsdu (transport, false); + int max_tsdu = 0; + socklen_t optlen = sizeof (max_tsdu); + + bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen); + zmq_assert (rc); + zmq_assert (optlen == sizeof (max_tsdu)); + return (size_t) max_tsdu; } -// pgm_transport_recvmsgv is called to fill the pgm_msgv array up to +// pgm_recvmsgv is called to fill the pgm_msgv array up to // pgm_msgv_len. In subsequent calls data from pgm_msgv structure are // returned. ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) @@ -453,15 +453,15 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Receive a vector of Application Protocol Domain Unit's (APDUs) // from the transport. - GError *pgm_error = NULL; + pgm_error_t *pgm_error = NULL; - const PGMIOStatus status = pgm_recvmsgv (transport, pgm_msgv, - pgm_msgv_len, MSG_DONTWAIT, &nbytes_rec, &pgm_error); + const int status = pgm_recvmsgv (sock, pgm_msgv, + pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error); zmq_assert (status != PGM_IO_STATUS_ERROR); // In a case when no ODATA/RDATA fired POLLIN event (SPM...) - // pgm_recvmsg returns ?. + // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING. if (status == PGM_IO_STATUS_TIMER_PENDING) { zmq_assert (nbytes_rec == 0); @@ -472,18 +472,40 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) return 0; } + // Send SPMR, NAK, ACK is rate limited. + if (status == PGM_IO_STATUS_RATE_LIMITED) { + + zmq_assert (nbytes_rec == 0); + + // In case if no RDATA/ODATA caused POLLIN 0 is + // returned. + nbytes_rec = 0; + return 0; + } + + // No peers and hence no incoming packets. + if (status == PGM_IO_STATUS_WOULD_BLOCK) { + + zmq_assert (nbytes_rec == 0); + + // In case if no RDATA/ODATA caused POLLIN 0 is + // returned. + nbytes_rec = 0; + return 0; + } + // Data loss. if (status == PGM_IO_STATUS_RESET) { - pgm_peer_t* peer = (pgm_peer_t*) transport->peers_pending->data; + struct pgm_sk_buff_t* skb = pgm_msgv[0].msgv_skb[0]; // Save lost data TSI. - *tsi_ = &peer->tsi; + *tsi_ = &skb->tsi; nbytes_rec = 0; // In case of dala loss -1 is returned. errno = EINVAL; - g_error_free (pgm_error); + pgm_free_skb (skb); return -1; } @@ -522,16 +544,16 @@ void zmq::pgm_socket_t::process_upstream () pgm_msgv_t dummy_msg; size_t dummy_bytes = 0; - GError *pgm_error = NULL; + pgm_error_t *pgm_error = NULL; - PGMIOStatus status = pgm_recvmsgv (transport, &dummy_msg, - 1, MSG_DONTWAIT, &dummy_bytes, &pgm_error); + const int status = pgm_recvmsgv (sock, &dummy_msg, + 1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error); zmq_assert (status != PGM_IO_STATUS_ERROR); // No data should be returned. zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || - status == PGM_IO_STATUS_RATE_LIMITED)); + status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK)); } #endif diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index b9f55d1..334e42f 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -74,7 +74,7 @@ namespace zmq private: // OpenPGM transport - pgm_transport_t* transport; + pgm_sock_t* sock; // Associated socket options. options_t options; diff --git a/src/zmq.cpp b/src/zmq.cpp index 3f1d88b..5b59802 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -242,20 +242,20 @@ void *zmq_init (int io_threads_) // protocol ID. Note that if you want to use gettimeofday and sleep for // openPGM timing, set environment variables PGM_TIMER to "GTOD" and // PGM_SLEEP to "USLEEP". - GError *pgm_error = NULL; - int rc = pgm_init (&pgm_error); + pgm_error_t *pgm_error = NULL; + bool rc = pgm_init (&pgm_error); if (rc != TRUE) { - if (pgm_error->domain == PGM_IF_ERROR && ( - pgm_error->code == PGM_IF_ERROR_INVAL || - pgm_error->code == PGM_IF_ERROR_XDEV || - pgm_error->code == PGM_IF_ERROR_NODEV || - pgm_error->code == PGM_IF_ERROR_NOTUNIQ || - pgm_error->code == PGM_IF_ERROR_ADDRFAMILY || - pgm_error->code == PGM_IF_ERROR_FAMILY || - pgm_error->code == PGM_IF_ERROR_NODATA || - pgm_error->code == PGM_IF_ERROR_NONAME || - pgm_error->code == PGM_IF_ERROR_SERVICE)) { - g_error_free (pgm_error); + if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && ( + pgm_error->code == PGM_ERROR_INVAL || + pgm_error->code == PGM_ERROR_XDEV || + pgm_error->code == PGM_ERROR_NODEV || + pgm_error->code == PGM_ERROR_NOTUNIQ || + pgm_error->code == PGM_ERROR_ADDRFAMILY || + pgm_error->code == PGM_ERROR_AFNOSUPPORT || + pgm_error->code == PGM_ERROR_NODATA || + pgm_error->code == PGM_ERROR_NONAME || + pgm_error->code == PGM_ERROR_SERVICE)) { + pgm_error_free (pgm_error); errno = EINVAL; return NULL; } |