diff options
| -rw-r--r-- | configure.in | 10 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 2 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 2 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 2 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 2 | ||||
| -rw-r--r-- | src/pgm_socket.cpp | 91 | ||||
| -rw-r--r-- | src/pgm_socket.hpp | 9 | 
7 files changed, 89 insertions, 29 deletions
| diff --git a/configure.in b/configure.in index 03adbe1..d82db7b 100644 --- a/configure.in +++ b/configure.in @@ -129,11 +129,6 @@ case "${host_os}" in          ;;  esac -# If not on QNX nor OSX add -pedantic into LIBZMQ_EXTRA_CXXFLAGS. -if test "x$pedantic" = "xyes"; then -    LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -pedantic" -fi -  # Check if we are running at sparc harware  AC_MSG_CHECKING([wheter __sparc__ is defined])  AC_COMPILE_IFELSE([AC_LANG_PROGRAM( @@ -522,6 +517,11 @@ fi  AC_SUBST(pgm_basename) +# If not on QNX nor OSX nor PGM add -pedantic into LIBZMQ_EXTRA_CXXFLAGS. +if test "x$pedantic" = "xyes" -a "x$pgm1_ext" = "xno" -a "x$pgm2_ext" = "xno"; then +    LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -pedantic" +fi +  # If not on QNX nor --with-pgm/2add -Werror into LIBZMQ_EXTRA_CXXFLAGS.  if test "x$werror" = "xyes" -a "x$pgm1_ext" = "xno" -a "x$pgm2_ext" = "xno"; then      LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -Werror" diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 998becb..50a8ff9 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -19,7 +19,7 @@  #include "platform.hpp" -#if defined ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM  #include <iostream> diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 3471324..b573081 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -22,7 +22,7 @@  #include "platform.hpp" -#if defined ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM  #include "io_object.hpp"  #include "i_engine.hpp" diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index d5f62ab..423865b 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -19,7 +19,7 @@  #include "platform.hpp" -#if defined ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM  #include <iostream> diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 32e97f6..8fdda6c 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -22,7 +22,7 @@  #include "platform.hpp" -#if defined ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM  #include "stdint.hpp"  #include "io_object.hpp" diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 0d7a3e2..c35870d 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -19,9 +19,12 @@  #include "platform.hpp" -#if defined ZMQ_HAVE_OPENPGM1 +#ifdef ZMQ_HAVE_OPENPGM  #ifdef ZMQ_HAVE_LINUX +//  TODO: add this into platform.hpp? +#define CONFIG_HAVE_POLL +  #include <pgm/pgm.h>  #include <openssl/md5.h>  #endif @@ -151,10 +154,6 @@ int zmq::pgm_socket_t::open_transport (void)      //  PGM transport GSI.      pgm_gsi_t gsi; -    //  PGM transport GSRs. -    struct group_source_req recv_gsr, send_gsr; -    size_t recv_gsr_len = 1; -      if (options.identity.size () > 0) {          //  Create gsi from identity string. @@ -174,6 +173,11 @@ int zmq::pgm_socket_t::open_transport (void)      zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi),          __FILE__, __LINE__); +#ifdef ZMQ_HAVE_OPENPGM1 +    //  PGM transport GSRs. +    struct group_source_req recv_gsr, send_gsr; +    size_t recv_gsr_len = 1; +      //  On success, 0 is returned. On invalid arguments, -EINVAL is returned.       //  If more multicast groups are found than the recv_len parameter,       //  -ENOMEM is returned. @@ -188,24 +192,54 @@ int zmq::pgm_socket_t::open_transport (void)          errno = ENOMEM;          return -1;      } +#endif  -    //  If we are using UDP encapsulation update send_gsr & recv_gsr  -    //  structures. Note that send_gsr & recv_gsr has to be updated after  -    //  pgm_if_parse_transport call. -    if (udp_encapsulation) { +#ifdef ZMQ_HAVE_OPENPGM2 +    struct pgm_transport_info_t* res = NULL; +     +    if (!pgm_if_get_transport_info (network, NULL, &res, NULL)) { +        errno = EINVAL; +        return -1; +    } + +    res->ti_gsi = gsi; +#endif + +    //  If we are using UDP encapsulation update gsr or res.  +    if (udp_encapsulation) { +#ifdef ZMQ_HAVE_OPENPGM1          //  Use the same port for UDP encapsulation.          ((struct sockaddr_in*)&send_gsr.gsr_group)->sin_port =               g_htons (port_number);  	((struct sockaddr_in*)&recv_gsr.gsr_group)->sin_port =               g_htons (port_number); +#endif + +#ifdef ZMQ_HAVE_OPENPGM2 +        res->ti_udp_encap_ucast_port = port_number; +        res->ti_udp_encap_mcast_port = port_number; +#endif      } +#ifdef ZMQ_HAVE_OPENPGM1      rc = pgm_transport_create (&g_transport, &gsi, 0, port_number, &recv_gsr,           1, &send_gsr);      if (rc != 0) {          return -1;      } +#endif + +#ifdef ZMQ_HAVE_OPENPGM2 +    if (!pgm_transport_create (&g_transport, res, NULL)) {  +        pgm_if_free_transport_info (res); +        //  TODO: tranlate errors from glib into errnos. +        errno = EINVAL; +        return -1; +    } + +    pgm_if_free_transport_info (res); +#endif      //  Common parameters for receiver and sender. @@ -347,6 +381,7 @@ int zmq::pgm_socket_t::open_transport (void)              return -1;          } +#ifdef ZMQ_HAVE_OPENPGM1          //  Preallocate full transmit window. For simplification always           //  worst case is used (40 bytes ipv6 header and 20 bytes UDP           //  encapsulation). @@ -361,6 +396,7 @@ int zmq::pgm_socket_t::open_transport (void)          zmq_log (2, "Preallocated %i slices in TX window. %s(%i)\n",               to_preallocate, __FILE__, __LINE__); +#endif          //  Set interval of background SPM packets [us].          rc = pgm_transport_set_ambient_spm (g_transport, 8192 * 1000); @@ -392,10 +428,19 @@ int zmq::pgm_socket_t::open_transport (void)      }      //  Bind a transport to the specified network devices. +#ifdef ZMQ_HAVE_OPENPGM1      rc = pgm_transport_bind (g_transport);      if (rc != 0) {          return -1;      } +#endif + +#ifdef ZMQ_HAVE_OPENPGM2 +   if (!pgm_transport_bind (g_transport, NULL)) { +        //  TODO: tranlate errors from glib into errnos. +        return -1; +   } +#endif      return 0;  } @@ -484,9 +529,13 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)  size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)  { +    ssize_t nbytes = 0; + +#ifdef ZMQ_HAVE_OPENPGM1 +      iovec iov = {data_,data_len_}; -    ssize_t nbytes = pgm_transport_send_packetv (g_transport, &iov, 1,  +    nbytes = pgm_transport_send_packetv (g_transport, &iov, 1,           MSG_DONTWAIT | MSG_WAITALL, true);      zmq_assert (nbytes != -EINVAL); @@ -505,6 +554,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)      if (nbytes > 0) {          zmq_assert (nbytes == (ssize_t)data_len_);      } +#endif      return nbytes;  } @@ -535,6 +585,7 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_)      return apdu_count;  } +#ifdef ZMQ_HAVE_OPENPGM1  //  Allocate buffer for one packet from the transmit window, The memory buffer   //  is owned by the transmit window and so must be returned to the window with   //  content via pgm_transport_send() calls or unused with pgm_packetv_free1().  @@ -553,12 +604,17 @@ void zmq::pgm_socket_t::free_buffer (void *data_)  {      pgm_packetv_free1 (g_transport, data_, false);  } +#endif  //  pgm_transport_recvmsgv is called to fill the pgm_msgv array up to   //  pgm_msgv_len. In subsequent calls data from pgm_msgv structure are   //  returned.  ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)  { + +    size_t raw_data_len = 0; + +#ifdef ZMQ_HAVE_OPENPGM1      //  We just sent all data from pgm_transport_recvmsgv up       //  and have to return 0 that another engine in this thread is scheduled.      if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { @@ -626,7 +682,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)      //  Take pointers from pgm_msgv_t structure.      *raw_data_ = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_base; -    size_t raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len; +    raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len;      //  Save current TSI.      *tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi; @@ -635,6 +691,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)      pgm_msgv_processed++;      nbytes_processed +=raw_data_len; +#endif +      zmq_log (4, "sendig up %i bytes\n", (int)raw_data_len);      return raw_data_len; @@ -643,12 +701,21 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)  void zmq::pgm_socket_t::process_upstream (void)  {      zmq_log (1, "On upstream packet, %s(%i)\n", __FILE__, __LINE__); + +    ssize_t dummy_bytes = 0; + +#ifdef ZMQ_HAVE_OPENPGM1      //  We acctually do not want to read any data here we are going to       //  process NAK.      pgm_msgv_t dummy_msg; -    ssize_t dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg, +    dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg,          1, MSG_DONTWAIT); +#endif + +#ifdef ZMQ_HAVE_OPENPGM2 +    zmq_assert (false); +#endif      //  No data should be returned.      zmq_assert (dummy_bytes == -1 && errno == EAGAIN); diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index fc7a0a3..6c1ca10 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -22,7 +22,7 @@  #include "platform.hpp" -#if defined ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM  #ifdef ZMQ_HAVE_LINUX  #include <glib.h> @@ -135,13 +135,6 @@ namespace zmq          //  Receiver transport uses 2 fd.          enum {pgm_receiver_fd_count = 2}; - -        //  TSI of the actual peer. -//        pgm_tsi_t tsi; - -        //  Previous peer TSI. -//        pgm_tsi_t retired_tsi; -  #endif      };  } | 
