diff options
author | malosek <malosek@fastmq.com> | 2009-09-25 17:50:12 +0200 |
---|---|---|
committer | malosek <malosek@fastmq.com> | 2009-09-25 17:50:12 +0200 |
commit | cf6cc0128ff4d26e0059f399bbb8342ce259b996 (patch) | |
tree | 080f672afefb80c2bffb3c80c2a4f96633f7f903 | |
parent | 72c5c5fff42fc0b4c9d1eaaaebe9d6e1dd8824f2 (diff) |
pgm2 receiver working (partly)
-rw-r--r-- | src/Makefile.am | 3 | ||||
-rw-r--r-- | src/pgm_socket.cpp | 161 | ||||
-rw-r--r-- | src/pgm_socket.hpp | 14 | ||||
-rw-r--r-- | src/socket_base.cpp | 4 |
4 files changed, 129 insertions, 53 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index e2f28e2..27784a0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -260,7 +260,8 @@ libzmq_la_CFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/in -DCONFIG_HAVE_GETPROTOBYNAME_R \ -DCONFIG_BIND_INADDR_ANY \ -DCONFIG_GALOIS_MUL_LUT \ - -DGETTEXT_PACKAGE=\'"pgm"\' + -DGETTEXT_PACKAGE='"pgm"' \ + -DG_LOG_DOMAIN='"Pgm"' endif if BUILD_NO_PGM diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index c35870d..65a80a5 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -119,6 +119,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (receiver) { pgm_msgv_len = get_max_apdu_at_once (in_batch_size); pgm_msgv = new pgm_msgv_t [pgm_msgv_len]; + zmq_log (1, "PGM transport: pgm_msgv_len %i, %s(%i)\n", + (int) pgm_msgv_len, __FILE__, __LINE__); } return 0; @@ -139,6 +141,12 @@ int zmq::pgm_socket_t::open_transport (void) nbytes_processed = 0; pgm_msgv_processed = 0; +#ifdef ZMQ_HAVE_OPENPGM1 + int pgm_ok = 0; +#elif defined ZMQ_HAVE_OPENPGM2 + int pgm_ok = true; +#endif + // Init PGM transport. // Ensure threading enabled, ensure timer enabled and find PGM protocol id. // @@ -170,8 +178,8 @@ int zmq::pgm_socket_t::open_transport (void) return -1; } - zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi), - __FILE__, __LINE__); + //zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi), + // __FILE__, __LINE__); #ifdef ZMQ_HAVE_OPENPGM1 // PGM transport GSRs. @@ -192,18 +200,17 @@ int zmq::pgm_socket_t::open_transport (void) errno = ENOMEM; return -1; } -#endif +#elif defined ZMQ_HAVE_OPENPGM2 + struct pgm_transport_info_t *res = NULL; + GError *pgm_error = NULL; -#ifdef ZMQ_HAVE_OPENPGM2 - struct pgm_transport_info_t* res = NULL; - - if (!pgm_if_get_transport_info (network, NULL, &res, NULL)) { + if (!pgm_if_get_transport_info (network, NULL, &res, &pgm_error)) { errno = EINVAL; return -1; } res->ti_gsi = gsi; - + res->ti_dport = port_number; #endif // If we are using UDP encapsulation update gsr or res. @@ -214,9 +221,7 @@ int zmq::pgm_socket_t::open_transport (void) g_htons (port_number); ((struct sockaddr_in*)&recv_gsr.gsr_group)->sin_port = g_htons (port_number); -#endif - -#ifdef ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2 res->ti_udp_encap_ucast_port = port_number; res->ti_udp_encap_mcast_port = port_number; #endif @@ -228,10 +233,8 @@ int zmq::pgm_socket_t::open_transport (void) if (rc != 0) { return -1; } -#endif - -#ifdef ZMQ_HAVE_OPENPGM2 - if (!pgm_transport_create (&g_transport, res, NULL)) { +#elif defined ZMQ_HAVE_OPENPGM2 + if (!pgm_transport_create (&g_transport, res, &pgm_error)) { pgm_if_free_transport_info (res); // TODO: tranlate errors from glib into errnos. errno = EINVAL; @@ -241,78 +244,88 @@ int zmq::pgm_socket_t::open_transport (void) pgm_if_free_transport_info (res); #endif + zmq_log (1, "PGM transport created, %s(%i)\n", __FILE__, __LINE__); + // Common parameters for receiver and sender. // Set maximum transport protocol data unit size (TPDU). rc = pgm_transport_set_max_tpdu (g_transport, pgm_max_tpdu); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } // Set maximum number of network hops to cross. rc = pgm_transport_set_hops (g_transport, 16); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } +#ifdef ZMQ_HAVE_OPENPGM2 + // Set nonblocking send/recv sockets. + if (!pgm_transport_set_nonblocking (g_transport, true)) { + errno = EINVAL; + return -1; + } +#endif + // Receiver transport. if (receiver) { - + // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. rc = pgm_transport_set_recv_only (g_transport, false); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } // Set NAK transmit back-off interval [us]. rc = pgm_transport_set_nak_bo_ivl (g_transport, 50*1000); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } // Set timeout before repeating NAK [us]. rc = pgm_transport_set_nak_rpt_ivl (g_transport, 200*1000); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } // Set timeout for receiving RDATA. rc = pgm_transport_set_nak_rdata_ivl (g_transport, 200*1000); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } // Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES). rc = pgm_transport_set_nak_data_retries (g_transport, 5); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } // Set retries for NCF after NAK (NAK_NCF_RETRIES). rc = pgm_transport_set_nak_ncf_retries (g_transport, 2); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } // Set timeout for removing a dead peer [us]. rc = pgm_transport_set_peer_expiry (g_transport, 5*8192*1000); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } // Set expiration time of SPM Requests [us]. rc = pgm_transport_set_spmr_expiry (g_transport, 25*1000); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } @@ -327,7 +340,7 @@ int zmq::pgm_socket_t::open_transport (void) rc = pgm_transport_set_rxw_max_rte (g_transport, options.rate * 1000 / 8); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } @@ -339,7 +352,7 @@ int zmq::pgm_socket_t::open_transport (void) } rc = pgm_transport_set_rxw_secs (g_transport, options.recovery_ivl); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } @@ -349,7 +362,7 @@ int zmq::pgm_socket_t::open_transport (void) // Set transport->can_recv = FALSE, waiting_pipe wont not be read. rc = pgm_transport_set_send_only (g_transport, TRUE); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } @@ -364,7 +377,7 @@ int zmq::pgm_socket_t::open_transport (void) rc = pgm_transport_set_txw_max_rte (g_transport, options.rate * 1000 / 8); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } @@ -376,7 +389,7 @@ int zmq::pgm_socket_t::open_transport (void) } rc = pgm_transport_set_txw_secs (g_transport, options.recovery_ivl); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } @@ -400,7 +413,7 @@ int zmq::pgm_socket_t::open_transport (void) // Set interval of background SPM packets [us]. rc = pgm_transport_set_ambient_spm (g_transport, 8192 * 1000); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } @@ -412,7 +425,7 @@ int zmq::pgm_socket_t::open_transport (void) rc = pgm_transport_set_heartbeat_spm (g_transport, spm_heartbeat, G_N_ELEMENTS(spm_heartbeat)); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } @@ -421,7 +434,7 @@ int zmq::pgm_socket_t::open_transport (void) // Enable multicast loopback. if (options.use_multicast_loop) { rc = pgm_transport_set_multicast_loop (g_transport, true); - if (rc != 0) { + if (rc != pgm_ok) { errno = EINVAL; return -1; } @@ -433,15 +446,15 @@ int zmq::pgm_socket_t::open_transport (void) if (rc != 0) { return -1; } -#endif - -#ifdef ZMQ_HAVE_OPENPGM2 - if (!pgm_transport_bind (g_transport, NULL)) { +#elif defined ZMQ_HAVE_OPENPGM2 + if (!pgm_transport_bind (g_transport, &pgm_error)) { // TODO: tranlate errors from glib into errnos. return -1; } #endif + zmq_log (1, "PGM transport binded, %s(%i)\n", __FILE__, __LINE__); + return 0; } @@ -585,26 +598,34 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_) return apdu_count; } -#ifdef ZMQ_HAVE_OPENPGM1 // Allocate buffer for one packet from the transmit window, The memory buffer // is owned by the transmit window and so must be returned to the window with // content via pgm_transport_send() calls or unused with pgm_packetv_free1(). void *zmq::pgm_socket_t::get_buffer (size_t *size_) { +#ifdef ZMQ_HAVE_OPENPGM1 // Store size. *size_ = get_max_tsdu_size (); // Allocate one packet. return pgm_packetv_alloc (g_transport, false); +#elif ZMQ_HAVE_OPENPGM2 + zmq_assert (false); + +#endif } // Return an unused packet allocated from the transmit window // via pgm_packetv_alloc(). void zmq::pgm_socket_t::free_buffer (void *data_) { +#ifdef ZMQ_HAVE_OPENPGM1 pgm_packetv_free1 (g_transport, data_, false); -} +#elif ZMQ_HAVE_OPENPGM2 + zmq_assert (false); #endif +} + // pgm_transport_recvmsgv is called to fill the pgm_msgv array up to // pgm_msgv_len. In subsequent calls data from pgm_msgv structure are @@ -614,7 +635,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) size_t raw_data_len = 0; -#ifdef ZMQ_HAVE_OPENPGM1 // We just sent all data from pgm_transport_recvmsgv up // and have to return 0 that another engine in this thread is scheduled. if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { @@ -638,9 +658,10 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Receive a vector of Application Protocol Domain Unit's (APDUs) // from the transport. +#ifdef ZMQ_HAVE_OPENPGM1 nbytes_rec = pgm_transport_recvmsgv (g_transport, pgm_msgv, pgm_msgv_len, MSG_DONTWAIT); - + // In a case when no ODATA/RDATA fired POLLIN event (SPM...) // pgm_transport_recvmsg returns -1 with errno == EAGAIN. if (nbytes_rec == -1 && errno == EAGAIN) { @@ -666,17 +687,49 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Catch the rest of the errors. if (nbytes_rec <= 0) { - zmq_log (2, "received %i B, errno %i, %s(%i)", (int)nbytes_rec, + zmq_log (2, "received %i B, errno %i, %s(%i).\n", (int)nbytes_rec, errno, __FILE__, __LINE__); errno_assert (nbytes_rec > 0); } - +#elif defined ZMQ_HAVE_OPENPGM2 + GError *pgm_error = NULL; + + const PGMIOStatus status = pgm_recvmsgv (g_transport, pgm_msgv, + pgm_msgv_len, MSG_DONTWAIT, &nbytes_rec, &pgm_error); + + if (nbytes_rec > 0) { + zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n", + status, (int) nbytes_rec, __FILE__, __LINE__); + } + + // In a case when no ODATA/RDATA fired POLLIN event (SPM...) + // pgm_recvmsg returns ?. + if (status == PGM_IO_STATUS_AGAIN || + status == PGM_IO_STATUS_AGAIN2) { + + // In case if no RDATA/ODATA caused POLLIN 0 is + // returned. + nbytes_rec = 0; + return 0; + } + + // Data loss? + if (status != PGM_IO_STATUS_NORMAL) { + zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n", + status, (int) nbytes_rec, __FILE__, __LINE__); + + nbytes_rec = 0; + return -1; + } +#endif + zmq_log (4, "received %i bytes\n", (int)nbytes_rec); } zmq_assert (nbytes_rec > 0); +#ifdef ZMQ_HAVE_OPENPGM1 // Only one APDU per pgm_msgv_t structure is allowed. zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_iovlen == 1); @@ -686,13 +739,25 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Save current TSI. *tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi; +#elif defined ZMQ_HAVE_OPENPGM2 + // Only one APDU per pgm_msgv_t structure is allowed. + zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1); + + struct pgm_sk_buff_t* skb = + pgm_msgv [pgm_msgv_processed].msgv_skb [0]; + + // Take pointers from pgm_msgv_t structure. + *raw_data_ = skb->data; + raw_data_len = skb->len; + + // Save current TSI. + *tsi_ = &skb->tsi; +#endif // Move the the next pgm_msgv_t structure. pgm_msgv_processed++; nbytes_processed +=raw_data_len; -#endif - zmq_log (4, "sendig up %i bytes\n", (int)raw_data_len); return raw_data_len; @@ -711,9 +776,7 @@ void zmq::pgm_socket_t::process_upstream (void) dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg, 1, MSG_DONTWAIT); -#endif - -#ifdef ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2 zmq_assert (false); #endif diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 6c1ca10..5225e50 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -119,16 +119,28 @@ namespace zmq pgm_msgv_t *pgm_msgv; // How many bytes were read from pgm socket. +#ifdef ZMQ_HAVE_OPENPGM1 ssize_t nbytes_rec; +#elif defined ZMQ_HAVE_OPENPGM2 + size_t nbytes_rec; +#endif // How many bytes were processed from last pgm socket read. +#ifdef ZMQ_HAVE_OPENPGM1 ssize_t nbytes_processed; +#elif defined ZMQ_HAVE_OPENPGM2 + size_t nbytes_processed; +#endif // How many messages from pgm_msgv were already sent up. +#ifdef ZMQ_HAVE_OPENPGM1 ssize_t pgm_msgv_processed; +#elif defined ZMQ_HAVE_OPENPGM2 + size_t pgm_msgv_processed; +#endif // Size of pgm_msgv array. - ssize_t pgm_msgv_len; + size_t pgm_msgv_len; // Sender transport uses 2 fd. enum {pgm_sender_fd_count = 2}; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b466b8c..6763167 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -93,7 +93,7 @@ int zmq::socket_base_t::bind (const char *addr_) return 0; } -#if defined ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM if (addr_type == "pgm" || addr_type == "udp") { // In the case of PGM bind behaves the same like connect. return connect (addr_); @@ -179,7 +179,7 @@ int zmq::socket_base_t::connect (const char *addr_) return 0; } -#if defined ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM if (addr_type == "pgm" || addr_type == "udp") { // If the socket type requires bi-directional communication |