summaryrefslogtreecommitdiff
path: root/src/pgm_socket.cpp
diff options
context:
space:
mode:
authorSteven McCoy <steven.mccoy@miru.hk>2010-09-28 16:58:51 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-09-30 09:11:19 +0200
commit96d85b20982926e60d5065cba3203971c9eeed63 (patch)
treec2d29faea3ec82b836b47e9d73de8f3d0994591e /src/pgm_socket.cpp
parent00cd7d49c7f2b532b2349581b82577bc714f9bf8 (diff)
* Add assertions to check for OpenPGM calls with invalid parameters.
* Assertion to check that pgm_getaddrinfo is actually returning something. * Missing pgm_connect call. * Typo on TOS causing immediate abort. * Placeholder calls for timeouts whilst continuing spin loop functionality. * OpenPGM v5 now supports reference counting so remove init checks. * Duplicate UDP unicast port setting, requires one unicast and one multicast. * Incorrectly set socket rcvbuf size with sndbuf. * Replace std::lexicographical_compare of TSI's with long word integer comparisons. * pgm_socket_t::receive returns -1 on no data.
Diffstat (limited to 'src/pgm_socket.cpp')
-rw-r--r--src/pgm_socket.cpp109
1 files changed, 71 insertions, 38 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 52983d9..3d55e69 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -52,6 +52,11 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
{
}
+// Create, bind and connect PGM socket.
+// network_ of the form <interface & multicast group decls>:<IP port>
+// e.g. eth0;239.192.0.1:7500
+// link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
+// ;[fe80::1%en0]:7500
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
{
// Can not open transport before destroying old one.
@@ -99,51 +104,59 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
memset (&hints, 0, sizeof (hints));
hints.ai_family = AF_UNSPEC;
if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) {
+// Invalid parameters don't set pgm_error_t
+ zmq_assert (pgm_error != NULL);
if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && (
- pgm_error->code == PGM_ERROR_INVAL ||
- pgm_error->code == PGM_ERROR_XDEV ||
- pgm_error->code == PGM_ERROR_NODEV ||
- pgm_error->code == PGM_ERROR_NOTUNIQ ||
- pgm_error->code == PGM_ERROR_ADDRFAMILY ||
- pgm_error->code == PGM_ERROR_AFNOSUPPORT ||
- pgm_error->code == PGM_ERROR_NODATA ||
- pgm_error->code == PGM_ERROR_NONAME ||
- pgm_error->code == PGM_ERROR_SERVICE))
+// NB: cannot catch EAI_BADFLAGS
+ pgm_error->code != PGM_ERROR_SERVICE &&
+ pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT))
+// User, host, or network configuration or transient error
goto err_abort;
-/* fatal OpenPGM API error */
+// Fatal OpenPGM internal error
zmq_assert (false);
}
+ zmq_assert (res != NULL);
+
// Pick up detected IP family
sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
// Create IP/PGM or UDP/PGM socket
if (udp_encapsulation_) {
if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_error)) {
+// Invalid parameters don't set pgm_error_t
+ zmq_assert (pgm_error != NULL);
if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
- pgm_error->code == PGM_ERROR_INVAL ||
- pgm_error->code == PGM_ERROR_NODEV))
+ pgm_error->code != PGM_ERROR_BADF &&
+ pgm_error->code != PGM_ERROR_FAULT &&
+ pgm_error->code != PGM_ERROR_NOPROTOOPT &&
+ pgm_error->code != PGM_ERROR_FAILED))
+// User, host, or network configuration or transient error
goto err_abort;
-/* fatal OpenPGM API error */
+// Fatal OpenPGM internal error
zmq_assert (false);
}
// All options are of data type int
const int encapsulation_port = port_number;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &encapsulation_port, sizeof (encapsulation_port)) ||
- !pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &encapsulation_port, sizeof (encapsulation_port)))
+ !pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &encapsulation_port, sizeof (encapsulation_port)))
goto err_abort;
} else {
if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_error)) {
+// Invalid parameters don't set pgm_error_t
+ zmq_assert (pgm_error != NULL);
if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
- pgm_error->code == PGM_ERROR_INVAL ||
- pgm_error->code == PGM_ERROR_PERM ||
- pgm_error->code == PGM_ERROR_NODEV))
+ pgm_error->code != PGM_ERROR_BADF &&
+ pgm_error->code != PGM_ERROR_FAULT &&
+ pgm_error->code != PGM_ERROR_NOPROTOOPT &&
+ pgm_error->code != PGM_ERROR_FAILED))
+// User, host, or network configuration or transient error
goto err_abort;
-/* fatal OpenPGM API error */
+// Fatal OpenPGM internal error
zmq_assert (false);
}
}
@@ -157,7 +170,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
goto err_abort;
}
if (sndbuf) {
- if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &sndbuf, sizeof (sndbuf)))
+ if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof (sndbuf)))
goto err_abort;
}
@@ -247,22 +260,17 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if_req.ir_scope_id = sa6.sin6_scope_id;
}
if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req), &if_req, sizeof (if_req), &pgm_error)) {
- if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && (
- pgm_error->code == PGM_ERROR_INVAL ||
- pgm_error->code == PGM_ERROR_XDEV ||
- pgm_error->code == PGM_ERROR_NODEV ||
- pgm_error->code == PGM_ERROR_NOTUNIQ ||
- pgm_error->code == PGM_ERROR_ADDRFAMILY ||
- pgm_error->code == PGM_ERROR_AFNOSUPPORT ||
- pgm_error->code == PGM_ERROR_NODATA ||
- pgm_error->code == PGM_ERROR_NONAME ||
- pgm_error->code == PGM_ERROR_SERVICE))
- goto err_abort;
- if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
- pgm_error->code == PGM_ERROR_FAILED))
+// Invalid parameters don't set pgm_error_t
+ zmq_assert (pgm_error != NULL);
+ if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET ||
+ pgm_error->domain == PGM_ERROR_DOMAIN_IF) && (
+ pgm_error->code != PGM_ERROR_INVAL &&
+ pgm_error->code != PGM_ERROR_BADF &&
+ pgm_error->code != PGM_ERROR_FAULT))
+// User, host, or network configuration or transient error
goto err_abort;
-/* fatal OpenPGM API error */
+// Fatal OpenPGM internal error
zmq_assert (false);
}
@@ -274,7 +282,9 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
}
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof (struct group_req)))
goto err_abort;
+
pgm_freeaddrinfo (res);
+ res = NULL;
// Set IP level parameters
{
@@ -287,12 +297,19 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof (multicast_hops)))
goto err_abort;
if (AF_INET6 != sa_family &&
- !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp)));
+ !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp)))
goto err_abort;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof (nonblocking)))
goto err_abort;
}
+ // Connect PGM transport to start state machine.
+ if (!pgm_connect (sock, &pgm_error)) {
+// Invalid parameters don't set pgm_error_t
+ zmq_assert (pgm_error != NULL);
+ goto err_abort;
+ }
+
// For receiver transport preallocate pgm_msgv array.
if (receiver) {
zmq_assert (in_batch_size > 0);
@@ -439,7 +456,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
nbytes_rec = 0;
nbytes_processed = 0;
pgm_msgv_processed = 0;
- return 0;
+ errno = EAGAIN;
+ return -1;
}
// If we have are going first time or if we have processed all pgm_msgv_t
@@ -458,6 +476,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
const int status = pgm_recvmsgv (sock, pgm_msgv,
pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
+ // Invalid parameters
zmq_assert (status != PGM_IO_STATUS_ERROR);
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
@@ -466,10 +485,17 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (nbytes_rec == 0);
+ struct timeval tv;
+ socklen_t optlen = sizeof (tv);
+ const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen);
+
+ zmq_assert (rc);
+
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec = 0;
- return 0;
+ errno = EBUSY;
+ return -1;
}
// Send SPMR, NAK, ACK is rate limited.
@@ -477,10 +503,15 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (nbytes_rec == 0);
+ struct timeval tv;
+ socklen_t optlen = sizeof (tv);
+ const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
+
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec = 0;
- return 0;
+ errno = EBUSY;
+ return -1;
}
// No peers and hence no incoming packets.
@@ -491,7 +522,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec = 0;
- return 0;
+ errno = EAGAIN;
+ return -1;
}
// Data loss.
@@ -549,6 +581,7 @@ void zmq::pgm_socket_t::process_upstream ()
const int status = pgm_recvmsgv (sock, &dummy_msg,
1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
+ // Invalid parameters
zmq_assert (status != PGM_IO_STATUS_ERROR);
// No data should be returned.