diff options
| -rw-r--r-- | src/Makefile.am | 3 | ||||
| -rw-r--r-- | src/pgm_socket.cpp | 161 | ||||
| -rw-r--r-- | src/pgm_socket.hpp | 14 | ||||
| -rw-r--r-- | src/socket_base.cpp | 4 | 
4 files changed, 129 insertions, 53 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index e2f28e2..27784a0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -260,7 +260,8 @@ libzmq_la_CFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/in      -DCONFIG_HAVE_GETPROTOBYNAME_R \      -DCONFIG_BIND_INADDR_ANY \      -DCONFIG_GALOIS_MUL_LUT \ -    -DGETTEXT_PACKAGE=\'"pgm"\' +    -DGETTEXT_PACKAGE='"pgm"' \ +    -DG_LOG_DOMAIN='"Pgm"'  endif  if BUILD_NO_PGM diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index c35870d..65a80a5 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -119,6 +119,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)      if (receiver) {          pgm_msgv_len = get_max_apdu_at_once (in_batch_size);          pgm_msgv = new pgm_msgv_t [pgm_msgv_len]; +        zmq_log (1, "PGM transport: pgm_msgv_len %i, %s(%i)\n",  +            (int) pgm_msgv_len, __FILE__, __LINE__);      }      return 0; @@ -139,6 +141,12 @@ int zmq::pgm_socket_t::open_transport (void)      nbytes_processed = 0;      pgm_msgv_processed = 0; +#ifdef ZMQ_HAVE_OPENPGM1 +    int pgm_ok = 0; +#elif defined ZMQ_HAVE_OPENPGM2 +    int pgm_ok = true; +#endif +      //  Init PGM transport.      //  Ensure threading enabled, ensure timer enabled and find PGM protocol id.      // @@ -170,8 +178,8 @@ int zmq::pgm_socket_t::open_transport (void)          return -1;      } -    zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi), -        __FILE__, __LINE__); +    //zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi), +    //    __FILE__, __LINE__);  #ifdef ZMQ_HAVE_OPENPGM1      //  PGM transport GSRs. @@ -192,18 +200,17 @@ int zmq::pgm_socket_t::open_transport (void)          errno = ENOMEM;          return -1;      } -#endif  +#elif defined ZMQ_HAVE_OPENPGM2 +    struct pgm_transport_info_t *res = NULL; +    GError *pgm_error = NULL; -#ifdef ZMQ_HAVE_OPENPGM2 -    struct pgm_transport_info_t* res = NULL; -     -    if (!pgm_if_get_transport_info (network, NULL, &res, NULL)) { +    if (!pgm_if_get_transport_info (network, NULL, &res, &pgm_error)) {          errno = EINVAL;          return -1;      }      res->ti_gsi = gsi; - +    res->ti_dport = port_number;  #endif      //  If we are using UDP encapsulation update gsr or res.  @@ -214,9 +221,7 @@ int zmq::pgm_socket_t::open_transport (void)              g_htons (port_number);  	((struct sockaddr_in*)&recv_gsr.gsr_group)->sin_port =               g_htons (port_number); -#endif - -#ifdef ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2          res->ti_udp_encap_ucast_port = port_number;          res->ti_udp_encap_mcast_port = port_number;  #endif @@ -228,10 +233,8 @@ int zmq::pgm_socket_t::open_transport (void)      if (rc != 0) {          return -1;      } -#endif - -#ifdef ZMQ_HAVE_OPENPGM2 -    if (!pgm_transport_create (&g_transport, res, NULL)) {  +#elif defined ZMQ_HAVE_OPENPGM2 +    if (!pgm_transport_create (&g_transport, res, &pgm_error)) {           pgm_if_free_transport_info (res);          //  TODO: tranlate errors from glib into errnos.          errno = EINVAL; @@ -241,78 +244,88 @@ int zmq::pgm_socket_t::open_transport (void)      pgm_if_free_transport_info (res);  #endif +    zmq_log (1, "PGM transport created, %s(%i)\n", __FILE__, __LINE__); +      //  Common parameters for receiver and sender.      //  Set maximum transport protocol data unit size (TPDU).      rc = pgm_transport_set_max_tpdu (g_transport, pgm_max_tpdu); -    if (rc != 0) { +    if (rc != pgm_ok) {          errno = EINVAL;          return -1;      }      //  Set maximum number of network hops to cross.      rc = pgm_transport_set_hops (g_transport, 16); -    if (rc != 0) { +    if (rc != pgm_ok) {          errno = EINVAL;          return -1;      } +#ifdef ZMQ_HAVE_OPENPGM2 +    //  Set nonblocking send/recv sockets. +    if (!pgm_transport_set_nonblocking (g_transport, true)) { +        errno = EINVAL; +        return -1; +    } +#endif +      //  Receiver transport.      if (receiver) { - +                  //  Set transport->can_send_data = FALSE.          //  Note that NAKs are still generated by the transport.          rc = pgm_transport_set_recv_only (g_transport, false); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          }          //  Set NAK transmit back-off interval [us].          rc = pgm_transport_set_nak_bo_ivl (g_transport, 50*1000); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          }          //  Set timeout before repeating NAK [us].          rc = pgm_transport_set_nak_rpt_ivl (g_transport, 200*1000); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          }          //  Set timeout for receiving RDATA.          rc = pgm_transport_set_nak_rdata_ivl (g_transport, 200*1000); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          }          //  Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES).          rc = pgm_transport_set_nak_data_retries (g_transport, 5); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          }          //  Set retries for NCF after NAK (NAK_NCF_RETRIES).          rc = pgm_transport_set_nak_ncf_retries (g_transport, 2); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          }          //  Set timeout for removing a dead peer [us].          rc = pgm_transport_set_peer_expiry (g_transport, 5*8192*1000); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          }          //  Set expiration time of SPM Requests [us].          rc = pgm_transport_set_spmr_expiry (g_transport, 25*1000); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          } @@ -327,7 +340,7 @@ int zmq::pgm_socket_t::open_transport (void)          rc = pgm_transport_set_rxw_max_rte (g_transport,               options.rate * 1000 / 8); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          } @@ -339,7 +352,7 @@ int zmq::pgm_socket_t::open_transport (void)          }          rc = pgm_transport_set_rxw_secs (g_transport, options.recovery_ivl); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          } @@ -349,7 +362,7 @@ int zmq::pgm_socket_t::open_transport (void)          //  Set transport->can_recv = FALSE, waiting_pipe wont not be read.          rc = pgm_transport_set_send_only (g_transport, TRUE); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          } @@ -364,7 +377,7 @@ int zmq::pgm_socket_t::open_transport (void)          rc = pgm_transport_set_txw_max_rte (g_transport,               options.rate * 1000 / 8); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          } @@ -376,7 +389,7 @@ int zmq::pgm_socket_t::open_transport (void)          }          rc = pgm_transport_set_txw_secs (g_transport, options.recovery_ivl); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          } @@ -400,7 +413,7 @@ int zmq::pgm_socket_t::open_transport (void)          //  Set interval of background SPM packets [us].          rc = pgm_transport_set_ambient_spm (g_transport, 8192 * 1000); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          } @@ -412,7 +425,7 @@ int zmq::pgm_socket_t::open_transport (void)  	rc = pgm_transport_set_heartbeat_spm (g_transport, spm_heartbeat,               G_N_ELEMENTS(spm_heartbeat)); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          } @@ -421,7 +434,7 @@ int zmq::pgm_socket_t::open_transport (void)      //  Enable multicast loopback.      if (options.use_multicast_loop) {          rc = pgm_transport_set_multicast_loop (g_transport, true); -        if (rc != 0) { +        if (rc != pgm_ok) {              errno = EINVAL;              return -1;          } @@ -433,15 +446,15 @@ int zmq::pgm_socket_t::open_transport (void)      if (rc != 0) {          return -1;      } -#endif - -#ifdef ZMQ_HAVE_OPENPGM2 -   if (!pgm_transport_bind (g_transport, NULL)) { +#elif defined ZMQ_HAVE_OPENPGM2 +   if (!pgm_transport_bind (g_transport, &pgm_error)) {          //  TODO: tranlate errors from glib into errnos.          return -1;     }  #endif +    zmq_log (1, "PGM transport binded, %s(%i)\n", __FILE__, __LINE__); +      return 0;  } @@ -585,26 +598,34 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_)      return apdu_count;  } -#ifdef ZMQ_HAVE_OPENPGM1  //  Allocate buffer for one packet from the transmit window, The memory buffer   //  is owned by the transmit window and so must be returned to the window with   //  content via pgm_transport_send() calls or unused with pgm_packetv_free1().   void *zmq::pgm_socket_t::get_buffer (size_t *size_)  { +#ifdef ZMQ_HAVE_OPENPGM1      //  Store size.      *size_ = get_max_tsdu_size ();      //  Allocate one packet.      return pgm_packetv_alloc (g_transport, false); +#elif ZMQ_HAVE_OPENPGM2 +    zmq_assert (false); + +#endif  }  //  Return an unused packet allocated from the transmit window   //  via pgm_packetv_alloc().   void zmq::pgm_socket_t::free_buffer (void *data_)  { +#ifdef ZMQ_HAVE_OPENPGM1      pgm_packetv_free1 (g_transport, data_, false); -} +#elif ZMQ_HAVE_OPENPGM2 +    zmq_assert (false);  #endif +} +  //  pgm_transport_recvmsgv is called to fill the pgm_msgv array up to   //  pgm_msgv_len. In subsequent calls data from pgm_msgv structure are  @@ -614,7 +635,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)      size_t raw_data_len = 0; -#ifdef ZMQ_HAVE_OPENPGM1      //  We just sent all data from pgm_transport_recvmsgv up       //  and have to return 0 that another engine in this thread is scheduled.      if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { @@ -638,9 +658,10 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)          //  Receive a vector of Application Protocol Domain Unit's (APDUs)           //  from the transport. +#ifdef ZMQ_HAVE_OPENPGM1          nbytes_rec = pgm_transport_recvmsgv (g_transport, pgm_msgv,               pgm_msgv_len, MSG_DONTWAIT); -   +          //  In a case when no ODATA/RDATA fired POLLIN event (SPM...)          //  pgm_transport_recvmsg returns -1 with errno == EAGAIN.          if (nbytes_rec == -1 && errno == EAGAIN) { @@ -666,17 +687,49 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)          //  Catch the rest of the errors.          if (nbytes_rec <= 0) { -            zmq_log (2, "received %i B, errno %i, %s(%i)", (int)nbytes_rec,  +            zmq_log (2, "received %i B, errno %i, %s(%i).\n", (int)nbytes_rec,                   errno, __FILE__, __LINE__);              errno_assert (nbytes_rec > 0);          } -    +#elif defined ZMQ_HAVE_OPENPGM2 +        GError *pgm_error = NULL; + +        const PGMIOStatus status = pgm_recvmsgv (g_transport, pgm_msgv, +            pgm_msgv_len, MSG_DONTWAIT, &nbytes_rec, &pgm_error); +      +        if (nbytes_rec > 0) { +            zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",  +                status, (int) nbytes_rec, __FILE__, __LINE__); +        } + +        //  In a case when no ODATA/RDATA fired POLLIN event (SPM...) +        //  pgm_recvmsg returns ?. +        if (status == PGM_IO_STATUS_AGAIN || +              status == PGM_IO_STATUS_AGAIN2) { +         +            //  In case if no RDATA/ODATA caused POLLIN 0 is  +            //  returned. +            nbytes_rec = 0; +            return 0; +        } + +        //  Data loss?  +        if (status != PGM_IO_STATUS_NORMAL) { +            zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",  +                status, (int) nbytes_rec, __FILE__, __LINE__); + +            nbytes_rec = 0; +            return -1; +        } +#endif +          zmq_log (4, "received %i bytes\n", (int)nbytes_rec);      }      zmq_assert (nbytes_rec > 0); +#ifdef ZMQ_HAVE_OPENPGM1      // Only one APDU per pgm_msgv_t structure is allowed.       zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_iovlen == 1); @@ -686,13 +739,25 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)      //  Save current TSI.      *tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi; +#elif defined ZMQ_HAVE_OPENPGM2 +    // Only one APDU per pgm_msgv_t structure is allowed. +    zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1); +  +    struct pgm_sk_buff_t* skb =  +        pgm_msgv [pgm_msgv_processed].msgv_skb [0]; + +    //  Take pointers from pgm_msgv_t structure. +    *raw_data_ = skb->data; +    raw_data_len = skb->len; + +    //  Save current TSI. +    *tsi_ = &skb->tsi; +#endif      //  Move the the next pgm_msgv_t structure.      pgm_msgv_processed++;      nbytes_processed +=raw_data_len; -#endif -      zmq_log (4, "sendig up %i bytes\n", (int)raw_data_len);      return raw_data_len; @@ -711,9 +776,7 @@ void zmq::pgm_socket_t::process_upstream (void)      dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg,          1, MSG_DONTWAIT); -#endif - -#ifdef ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2      zmq_assert (false);  #endif diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 6c1ca10..5225e50 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -119,16 +119,28 @@ namespace zmq          pgm_msgv_t *pgm_msgv;          // How many bytes were read from pgm socket. +#ifdef ZMQ_HAVE_OPENPGM1          ssize_t nbytes_rec; +#elif defined ZMQ_HAVE_OPENPGM2 +        size_t nbytes_rec; +#endif          //  How many bytes were processed from last pgm socket read. +#ifdef ZMQ_HAVE_OPENPGM1          ssize_t nbytes_processed; +#elif defined ZMQ_HAVE_OPENPGM2 +        size_t nbytes_processed; +#endif          //  How many messages from pgm_msgv were already sent up. +#ifdef ZMQ_HAVE_OPENPGM1          ssize_t pgm_msgv_processed; +#elif defined ZMQ_HAVE_OPENPGM2 +        size_t pgm_msgv_processed; +#endif          //  Size of pgm_msgv array. -        ssize_t pgm_msgv_len; +        size_t pgm_msgv_len;          //  Sender transport uses 2 fd.          enum {pgm_sender_fd_count = 2}; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b466b8c..6763167 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -93,7 +93,7 @@ int zmq::socket_base_t::bind (const char *addr_)          return 0;      } -#if defined ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM      if (addr_type == "pgm" || addr_type == "udp") {          //  In the case of PGM bind behaves the same like connect.          return connect (addr_);  @@ -179,7 +179,7 @@ int zmq::socket_base_t::connect (const char *addr_)          return 0;      } -#if defined ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM      if (addr_type == "pgm" || addr_type == "udp") {          //  If the socket type requires bi-directional communication | 
