From 96d85b20982926e60d5065cba3203971c9eeed63 Mon Sep 17 00:00:00 2001 From: Steven McCoy Date: Tue, 28 Sep 2010 16:58:51 +0200 Subject: * Add assertions to check for OpenPGM calls with invalid parameters. * Assertion to check that pgm_getaddrinfo is actually returning something. * Missing pgm_connect call. * Typo on TOS causing immediate abort. * Placeholder calls for timeouts whilst continuing spin loop functionality. * OpenPGM v5 now supports reference counting so remove init checks. * Duplicate UDP unicast port setting, requires one unicast and one multicast. * Incorrectly set socket rcvbuf size with sndbuf. * Replace std::lexicographical_compare of TSI's with long word integer comparisons. * pgm_socket_t::receive returns -1 on no data. --- src/pgm_receiver.cpp | 2 +- src/pgm_receiver.hpp | 12 +++--- src/pgm_socket.cpp | 109 +++++++++++++++++++++++++++++++++------------------ src/zmq.cpp | 24 +++++------- 4 files changed, 87 insertions(+), 60 deletions(-) diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 5532546..ceae0da 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -144,7 +144,7 @@ void zmq::pgm_receiver_t::in_event () // No data to process. This may happen if the packet received is // neither ODATA nor ODATA. - if (received == 0) + if (received < 0) break; // Find the peer based on its TSI. diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index bbdb31d..f32d37e 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -73,15 +73,13 @@ namespace zmq struct tsi_comp { - inline bool operator () (const pgm_tsi_t <si, + bool operator () (const pgm_tsi_t <si, const pgm_tsi_t &rtsi) const { - if (ltsi.sport < rtsi.sport) - return true; - - return (std::lexicographical_compare (ltsi.gsi.identifier, - ltsi.gsi.identifier + 6, - rtsi.gsi.identifier, rtsi.gsi.identifier + 6)); + uint32_t ll[2], rl[2]; + memcpy (ll, <si, sizeof (ll)); + memcpy (rl, &rtsi, sizeof (rl)); + return (ll[0] < rl[0]) || (ll[0] == rl[0] && ll[1] < rl[1]); } }; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 52983d9..3d55e69 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -52,6 +52,11 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : { } +// Create, bind and connect PGM socket. +// network_ of the form : +// e.g. eth0;239.192.0.1:7500 +// link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000 +// ;[fe80::1%en0]:7500 int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) { // Can not open transport before destroying old one. @@ -99,51 +104,59 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) memset (&hints, 0, sizeof (hints)); hints.ai_family = AF_UNSPEC; if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) { +// Invalid parameters don't set pgm_error_t + zmq_assert (pgm_error != NULL); if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && ( - pgm_error->code == PGM_ERROR_INVAL || - pgm_error->code == PGM_ERROR_XDEV || - pgm_error->code == PGM_ERROR_NODEV || - pgm_error->code == PGM_ERROR_NOTUNIQ || - pgm_error->code == PGM_ERROR_ADDRFAMILY || - pgm_error->code == PGM_ERROR_AFNOSUPPORT || - pgm_error->code == PGM_ERROR_NODATA || - pgm_error->code == PGM_ERROR_NONAME || - pgm_error->code == PGM_ERROR_SERVICE)) +// NB: cannot catch EAI_BADFLAGS + pgm_error->code != PGM_ERROR_SERVICE && + pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) +// User, host, or network configuration or transient error goto err_abort; -/* fatal OpenPGM API error */ +// Fatal OpenPGM internal error zmq_assert (false); } + zmq_assert (res != NULL); + // Pick up detected IP family sa_family = res->ai_send_addrs[0].gsr_group.ss_family; // Create IP/PGM or UDP/PGM socket if (udp_encapsulation_) { if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_error)) { +// Invalid parameters don't set pgm_error_t + zmq_assert (pgm_error != NULL); if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( - pgm_error->code == PGM_ERROR_INVAL || - pgm_error->code == PGM_ERROR_NODEV)) + pgm_error->code != PGM_ERROR_BADF && + pgm_error->code != PGM_ERROR_FAULT && + pgm_error->code != PGM_ERROR_NOPROTOOPT && + pgm_error->code != PGM_ERROR_FAILED)) +// User, host, or network configuration or transient error goto err_abort; -/* fatal OpenPGM API error */ +// Fatal OpenPGM internal error zmq_assert (false); } // All options are of data type int const int encapsulation_port = port_number; if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &encapsulation_port, sizeof (encapsulation_port)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &encapsulation_port, sizeof (encapsulation_port))) + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &encapsulation_port, sizeof (encapsulation_port))) goto err_abort; } else { if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_error)) { +// Invalid parameters don't set pgm_error_t + zmq_assert (pgm_error != NULL); if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( - pgm_error->code == PGM_ERROR_INVAL || - pgm_error->code == PGM_ERROR_PERM || - pgm_error->code == PGM_ERROR_NODEV)) + pgm_error->code != PGM_ERROR_BADF && + pgm_error->code != PGM_ERROR_FAULT && + pgm_error->code != PGM_ERROR_NOPROTOOPT && + pgm_error->code != PGM_ERROR_FAILED)) +// User, host, or network configuration or transient error goto err_abort; -/* fatal OpenPGM API error */ +// Fatal OpenPGM internal error zmq_assert (false); } } @@ -157,7 +170,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) goto err_abort; } if (sndbuf) { - if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &sndbuf, sizeof (sndbuf))) + if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof (sndbuf))) goto err_abort; } @@ -247,22 +260,17 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if_req.ir_scope_id = sa6.sin6_scope_id; } if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req), &if_req, sizeof (if_req), &pgm_error)) { - if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && ( - pgm_error->code == PGM_ERROR_INVAL || - pgm_error->code == PGM_ERROR_XDEV || - pgm_error->code == PGM_ERROR_NODEV || - pgm_error->code == PGM_ERROR_NOTUNIQ || - pgm_error->code == PGM_ERROR_ADDRFAMILY || - pgm_error->code == PGM_ERROR_AFNOSUPPORT || - pgm_error->code == PGM_ERROR_NODATA || - pgm_error->code == PGM_ERROR_NONAME || - pgm_error->code == PGM_ERROR_SERVICE)) - goto err_abort; - if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( - pgm_error->code == PGM_ERROR_FAILED)) +// Invalid parameters don't set pgm_error_t + zmq_assert (pgm_error != NULL); + if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET || + pgm_error->domain == PGM_ERROR_DOMAIN_IF) && ( + pgm_error->code != PGM_ERROR_INVAL && + pgm_error->code != PGM_ERROR_BADF && + pgm_error->code != PGM_ERROR_FAULT)) +// User, host, or network configuration or transient error goto err_abort; -/* fatal OpenPGM API error */ +// Fatal OpenPGM internal error zmq_assert (false); } @@ -274,7 +282,9 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) } if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof (struct group_req))) goto err_abort; + pgm_freeaddrinfo (res); + res = NULL; // Set IP level parameters { @@ -287,12 +297,19 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) !pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof (multicast_hops))) goto err_abort; if (AF_INET6 != sa_family && - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp))); + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp))) goto err_abort; if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof (nonblocking))) goto err_abort; } + // Connect PGM transport to start state machine. + if (!pgm_connect (sock, &pgm_error)) { +// Invalid parameters don't set pgm_error_t + zmq_assert (pgm_error != NULL); + goto err_abort; + } + // For receiver transport preallocate pgm_msgv array. if (receiver) { zmq_assert (in_batch_size > 0); @@ -439,7 +456,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) nbytes_rec = 0; nbytes_processed = 0; pgm_msgv_processed = 0; - return 0; + errno = EAGAIN; + return -1; } // If we have are going first time or if we have processed all pgm_msgv_t @@ -458,6 +476,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) const int status = pgm_recvmsgv (sock, pgm_msgv, pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error); + // Invalid parameters zmq_assert (status != PGM_IO_STATUS_ERROR); // In a case when no ODATA/RDATA fired POLLIN event (SPM...) @@ -466,10 +485,17 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_assert (nbytes_rec == 0); + struct timeval tv; + socklen_t optlen = sizeof (tv); + const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen); + + zmq_assert (rc); + // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; - return 0; + errno = EBUSY; + return -1; } // Send SPMR, NAK, ACK is rate limited. @@ -477,10 +503,15 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_assert (nbytes_rec == 0); + struct timeval tv; + socklen_t optlen = sizeof (tv); + const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen); + // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; - return 0; + errno = EBUSY; + return -1; } // No peers and hence no incoming packets. @@ -491,7 +522,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; - return 0; + errno = EAGAIN; + return -1; } // Data loss. @@ -549,6 +581,7 @@ void zmq::pgm_socket_t::process_upstream () const int status = pgm_recvmsgv (sock, &dummy_msg, 1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error); + // Invalid parameters zmq_assert (status != PGM_IO_STATUS_ERROR); // No data should be returned. diff --git a/src/zmq.cpp b/src/zmq.cpp index 5b59802..306a85d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -234,31 +234,27 @@ void *zmq_init (int io_threads_) } #if defined ZMQ_HAVE_OPENPGM - // Unfortunately, OpenPGM doesn't support refcounted init/shutdown, thus, - // let's fail if it was initialised beforehand. - zmq_assert (!pgm_supported ()); // Init PGM transport. Ensure threading and timer are enabled. Find PGM // protocol ID. Note that if you want to use gettimeofday and sleep for // openPGM timing, set environment variables PGM_TIMER to "GTOD" and // PGM_SLEEP to "USLEEP". pgm_error_t *pgm_error = NULL; - bool rc = pgm_init (&pgm_error); + const bool rc = pgm_init (&pgm_error); if (rc != TRUE) { - if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && ( - pgm_error->code == PGM_ERROR_INVAL || - pgm_error->code == PGM_ERROR_XDEV || - pgm_error->code == PGM_ERROR_NODEV || - pgm_error->code == PGM_ERROR_NOTUNIQ || - pgm_error->code == PGM_ERROR_ADDRFAMILY || - pgm_error->code == PGM_ERROR_AFNOSUPPORT || - pgm_error->code == PGM_ERROR_NODATA || - pgm_error->code == PGM_ERROR_NONAME || - pgm_error->code == PGM_ERROR_SERVICE)) { + + // Invalid parameters don't set pgm_error_t + zmq_assert (pgm_error != NULL); + if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && ( + pgm_error->code == PGM_ERROR_FAILED)) { + + // Failed to access RTC or HPET device. pgm_error_free (pgm_error); errno = EINVAL; return NULL; } + + // PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg. zmq_assert (false); } #endif -- cgit v1.2.3