diff options
-rw-r--r-- | Makefile.am | 4 | ||||
-rw-r--r-- | configure.in | 134 | ||||
-rw-r--r-- | foreign/openpgm/libpgm-1.2.14.tar.bz2 | bin | 280029 -> 0 bytes | |||
-rw-r--r-- | foreign/openpgm/lost_data_tsi.patch | 76 | ||||
-rw-r--r-- | src/Makefile.am | 59 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 10 | ||||
-rw-r--r-- | src/pgm_sender.cpp | 18 | ||||
-rw-r--r-- | src/pgm_sender.hpp | 2 | ||||
-rw-r--r-- | src/pgm_socket.cpp | 226 | ||||
-rw-r--r-- | src/pgm_socket.hpp | 16 |
10 files changed, 18 insertions, 527 deletions
diff --git a/Makefile.am b/Makefile.am index 1bda895..aa3b0de 100644 --- a/Makefile.am +++ b/Makefile.am @@ -9,13 +9,11 @@ endif SUBDIRS = src $(DIR_MAN) $(DIR_PERF) devices bindings examples DIST_SUBDIRS = src man perf devices bindings examples -EXTRA_DIST = $(top_srcdir)/foreign/openpgm/@pgm1_basename@.tar.bz2 \ +EXTRA_DIST = \ $(top_srcdir)/foreign/openpgm/@pgm2_basename@.tar.bz2 \ -$(top_srcdir)/foreign/openpgm/lost_data_tsi.patch \ $(top_srcdir)/foreign/openpgm/create_custom_gsi_1.patch \ $(top_srcdir)/foreign/xmlParser/xmlParser.cpp \ $(top_srcdir)/foreign/xmlParser/xmlParser.hpp dist-hook: - -rm -rf $(distdir)/foreign/openpgm/@pgm1_basename@ -rm -rf $(distdir)/foreign/openpgm/@pgm2_basename@ diff --git a/configure.in b/configure.in index 49d94f6..9d034ed 100644 --- a/configure.in +++ b/configure.in @@ -407,33 +407,22 @@ if test "x$clzmq" = "xyes"; then fi # PGM extension -pgm1_ext="no" pgm2_ext="no" -pgm1_basename="libpgm-1.2.14" pgm2_basename="libpgm-2.0.17rc2" -AC_SUBST(pgm1_basename) AC_SUBST(pgm2_basename) pgm_basename="" AC_ARG_WITH([pgm], [AS_HELP_STRING([--with-pgm], - [build libzmq with PGM v1 extension [default=no]])], - [with_pgm1_ext=yes], [with_pgm1_ext=no]) - -AC_ARG_WITH([pgm2], [AS_HELP_STRING([--with-pgm2], - [build libzmq with PGM v2 extension [default=no]])], + [build libzmq with PGM extension [default=no]])], [with_pgm2_ext=yes], [with_pgm2_ext=no]) -AC_ARG_WITH([pgm2-examples], [AS_HELP_STRING([--with-pgm2-examples], - [build PGM v2 examples [default=no]])], +AC_ARG_WITH([pgm-examples], [AS_HELP_STRING([--with-pgm-examples], + [build PGM examples [default=no]])], [with_pgm2_examples=yes], [with_pgm2_examples=no]) -if test "x$with_pgm1_ext" != "xno" -a "x$with_pgm2_ext" != "xno"; then - AC_MSG_ERROR([Can not configure --with-pgm and --with-pgm2.]); -fi - if test "x$with_pgm2_ext" = "xno" -a "x$with_pgm2_examples" = "xyes"; then AC_MSG_ERROR([Can not configure --with-pgm2-examples without --with-pgm2.]); fi @@ -442,98 +431,6 @@ if test "x$c" = "xno" -a "x$with_pgm2_examples" = "xyes"; then AC_MSG_ERROR([Can not configure --with-pgm2-examples without --with-c.]); fi -if test "x$with_pgm1_ext" != "xno"; then - - pgm_basename=${pgm1_basename} - - # Test if we have pkg-config - if test "x$have_pkg_config" != "xyes"; then - AC_MSG_ERROR([To run configure with --with-pgm option, pkg-config has to be installed.]); - fi - - case "${host_os}" in - *linux*) - LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -Wno-variadic-macros -Wno-long-long " - ;; - *) - AC_MSG_ERROR([PGM extesion is not supported on this platform $host.]) - ;; - esac - - AC_CHECK_PROG(have_tar, tar, yes, no) - if test "x$have_tar" != "xyes"; then - AC_MSG_ERROR([Could not find tar.]) - fi - - AC_CHECK_PROG(have_patch, patch, yes, no) - if test "x$have_patch" != "xyes"; then - AC_MSG_ERROR([Could not find patch.]) - fi - - AC_CHECK_PROG(have_bunzip2, bunzip2, yes, no) - if test "x$have_bunzip2" != "xyes"; then - AC_MSG_ERROR([Could not find bunzip2.]) - fi - - AC_CHECK_PROG(have_perl, perl, yes, no) - if test "x$have_perl" != "xyes"; then - AC_MSG_ERROR([Could not find perl.]) - fi - - if test "x$pyzmq" != "xyes"; then - AC_CHECK_PROG(have_python, python, yes, no) - if test "x$have_python" != "xyes"; then - AC_MSG_ERROR([Could not find python.]) - fi - fi - - # Unpack libpgm1 - AC_MSG_CHECKING([Unpacking ${pgm_basename}.tar.bz2]) - - if tar -xjf foreign/openpgm/${pgm_basename}.tar.bz2 -C foreign/openpgm/; then - AC_MSG_RESULT([yes]) - else - AC_MSG_ERROR([Could not unpack foreign/openpgm/${pgm_basename}.tar.bz2 file.]) - fi - - AC_MSG_CHECKING([Patching ${pgm_basename}]) - - if patch --silent -p0 < foreign/openpgm/lost_data_tsi.patch; then - AC_MSG_RESULT([yes]) - else - AC_MSG_ERROR([Could not apply foreign/openpgm/lost_data_tsi.patch file.]) - fi - - AC_MSG_CHECKING([Patching ${pgm_basename}]) - - if patch --silent -p0 < foreign/openpgm/create_custom_gsi_1.patch; then - AC_MSG_RESULT([yes]) - else - AC_MSG_ERROR([Could not apply foreign/openpgm/create_custom_gsi_1.patch file.]) - fi - - # Generate galois_tables.c - AC_CONFIG_COMMANDS([galois_tables.c], - [perl foreign/openpgm/libpgm-1.2.14/openpgm/pgm/galois_generator.pl > \ - foreign/openpgm/libpgm-1.2.14/openpgm/pgm/galois_tables.c]) - - # Generate version.c - AC_CONFIG_COMMANDS([version.c], - [python foreign/openpgm/libpgm-1.2.14/openpgm/pgm/version_generator.py > \ - foreign/openpgm/libpgm-1.2.14/openpgm/pgm/version.c]) - - # Check for OpenPGM nedded libraries. - PKG_CHECK_MODULES([GLIB], [glib-2.0 gthread-2.0]) - - LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} ${GLIB_CFLAGS} " - - LIBZMQ_EXTRA_LDFLAFS="${LIBZMQ_EXTRA_LDFLAFS} ${GLIB_LIBS}" - - AC_DEFINE(ZMQ_HAVE_OPENPGM, 1, [Have OpenPGM v1 or v2 extension.]) - AC_DEFINE(ZMQ_HAVE_OPENPGM1, 1, [Have OpenPGM v1 extension.]) - pgm1_ext="yes" -fi - if test "x$with_pgm2_ext" != "xno"; then pgm_basename=${pgm2_basename} @@ -596,12 +493,12 @@ fi AC_SUBST(pgm_basename) # If not on QNX nor OSX nor PGM add -pedantic into LIBZMQ_EXTRA_CXXFLAGS. -if test "x$pedantic" = "xyes" -a "x$pgm1_ext" = "xno" -a "x$pgm2_ext" = "xno"; then +if test "x$pedantic" = "xyes" -a "x$pgm2_ext" = "xno"; then LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -pedantic" fi -# If not on QNX nor --with-pgm/2add -Werror into LIBZMQ_EXTRA_CXXFLAGS. -if test "x$werror" = "xyes" -a "x$pgm1_ext" = "xno" -a "x$pgm2_ext" = "xno"; then +# If not on QNX nor --with-pgm2 add -Werror into LIBZMQ_EXTRA_CXXFLAGS. +if test "x$werror" = "xyes" -a "x$pgm2_ext" = "xno"; then LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -Werror" fi @@ -655,9 +552,8 @@ AM_CONDITIONAL(BUILD_RUBY, test "x$rbzmq" = "xyes") AM_CONDITIONAL(BUILD_C, test "x$czmq" = "xyes") AM_CONDITIONAL(BUILD_CL, test "x$clzmq" = "xyes") AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes") -AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes") AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes") -AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno" -a "x$pgm1_ext" = "xno") +AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno") AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes") AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes") @@ -712,17 +608,11 @@ AC_MSG_RESULT([ Ruby: $rbzmq]) if test "x$rbzmq" = "xyes"; then AC_MSG_RESULT([ Ruby library install dir: $rubydir]) fi -AC_MSG_RESULT([ Network protocols:]) -AC_MSG_RESULT([ TCP: yes]) -if test "x$pgm1_ext" = "xyes"; then -AC_MSG_RESULT([ PGM: $pgm1_ext ($pgm_basename)]) -fi -if test "x$pgm2_ext" = "xyes"; then -AC_MSG_RESULT([ PGM: $pgm2_ext ($pgm_basename)]) -fi -if test "x$pgm1_ext" = "xno" -a "x$pgm2_ext" = "xno"; then -AC_MSG_RESULT([ PGM: no]) -fi +AC_MSG_RESULT([ Transports:]) +AC_MSG_RESULT([ tcp: yes]) +AC_MSG_RESULT([ udp: $pgm2_ext]) +AC_MSG_RESULT([ pgm: $pgm2_ext]) +AC_MSG_RESULT([ inproc: yes]) AC_MSG_RESULT([ Devices:]) AC_MSG_RESULT([ Forwarder: $forwarder]) AC_MSG_RESULT([ Streamer: $streamer]) diff --git a/foreign/openpgm/libpgm-1.2.14.tar.bz2 b/foreign/openpgm/libpgm-1.2.14.tar.bz2 Binary files differdeleted file mode 100644 index 718465d..0000000 --- a/foreign/openpgm/libpgm-1.2.14.tar.bz2 +++ /dev/null diff --git a/foreign/openpgm/lost_data_tsi.patch b/foreign/openpgm/lost_data_tsi.patch deleted file mode 100644 index 6bfe02f..0000000 --- a/foreign/openpgm/lost_data_tsi.patch +++ /dev/null @@ -1,76 +0,0 @@ ---- foreign/openpgm/libpgm-1.2.14/openpgm/pgm/transport.c 2009-08-27 04:54:04.000000000 +0200 -+++ foreign/openpgm/libpgm-1.2.14/openpgm/pgm/transport.c 2009-09-22 14:36:07.713124619 +0200 -@@ -2342,6 +2342,7 @@ - if (waiting_rxw->ack_cumulative_losses != waiting_rxw->cumulative_losses) - { - transport->has_lost_data = TRUE; -+ memcpy (&(transport->lost_data_tsi), waiting_rxw->identifier, sizeof (pgm_tsi_t)); - waiting_rxw->pgm_sock_err.lost_count = waiting_rxw->cumulative_losses - waiting_rxw->ack_cumulative_losses; - waiting_rxw->ack_cumulative_losses = waiting_rxw->cumulative_losses; - } -@@ -2705,6 +2706,7 @@ - if (waiting_rxw->ack_cumulative_losses != waiting_rxw->cumulative_losses) - { - transport->has_lost_data = TRUE; -+ memcpy (&(transport->lost_data_tsi), waiting_rxw->identifier, sizeof (pgm_tsi_t)); - waiting_rxw->pgm_sock_err.lost_count = waiting_rxw->cumulative_losses - waiting_rxw->ack_cumulative_losses; - waiting_rxw->ack_cumulative_losses = waiting_rxw->cumulative_losses; - } -@@ -3407,6 +3409,7 @@ - !sender_rxw->waiting_link.data) - { - transport->has_lost_data = TRUE; -+ memcpy (&(transport->lost_data_tsi), sender_rxw->identifier, sizeof (pgm_tsi_t)); - sender_rxw->pgm_sock_err.lost_count = sender_rxw->cumulative_losses - sender_rxw->ack_cumulative_losses; - sender_rxw->ack_cumulative_losses = sender_rxw->cumulative_losses; - -@@ -3823,6 +3826,7 @@ - !peer_rxw->waiting_link.data) - { - transport->has_lost_data = TRUE; -+ memcpy (&(transport->lost_data_tsi), peer_rxw->identifier, sizeof (pgm_tsi_t)); - peer_rxw->pgm_sock_err.lost_count = peer_rxw->cumulative_losses - peer_rxw->ack_cumulative_losses; - peer_rxw->ack_cumulative_losses = peer_rxw->cumulative_losses; - -@@ -3952,6 +3956,7 @@ - !peer_rxw->waiting_link.data) - { - transport->has_lost_data = TRUE; -+ memcpy (&(transport->lost_data_tsi), peer_rxw->identifier, sizeof (pgm_tsi_t)); - peer_rxw->pgm_sock_err.lost_count = peer_rxw->cumulative_losses - peer_rxw->ack_cumulative_losses; - peer_rxw->ack_cumulative_losses = peer_rxw->cumulative_losses; - -@@ -4849,6 +4854,7 @@ - !rxw->waiting_link.data) - { - transport->has_lost_data = TRUE; -+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t)); - rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses; - rxw->ack_cumulative_losses = rxw->cumulative_losses; - -@@ -5166,6 +5172,7 @@ - !rxw->waiting_link.data) - { - transport->has_lost_data = TRUE; -+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t)); - rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses; - rxw->ack_cumulative_losses = rxw->cumulative_losses; - -@@ -5303,6 +5310,7 @@ - !rxw->waiting_link.data) - { - transport->has_lost_data = TRUE; -+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t)); - rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses; - rxw->ack_cumulative_losses = rxw->cumulative_losses; - ---- foreign/openpgm/libpgm-1.2.14/openpgm/pgm/include/pgm/transport.h 2009-08-27 04:53:23.000000000 +0200 -+++ foreign/openpgm/libpgm-1.2.14/openpgm/pgm/include/pgm/transport.h 2009-09-21 15:49:36.000000000 +0200 -@@ -205,6 +205,7 @@ - gboolean is_bound; - gboolean is_open; - gboolean has_lost_data; -+ pgm_tsi_t lost_data_tsi; - gboolean will_close_on_failure; - - gboolean can_send_data; /* and SPMs */ diff --git a/src/Makefile.am b/src/Makefile.am index 3d038b7..bbfa7f5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -15,23 +15,6 @@ include_HEADERS = ../bindings/c/zmq.h endif endif -if BUILD_PGM1 -pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/timer.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/gsi.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/signal.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/txwi.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/rxwi.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/transport.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/rate_control.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/async.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/checksum.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/reed_solomon.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/version.c \ - ../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_tables.c -endif - if BUILD_PGM2 pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c \ @@ -179,48 +162,6 @@ libzmq_la_SOURCES = app_thread.hpp \ libzmq_la_LDFLAGS = -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAFS@ -if BUILD_PGM1 -libzmq_la_CXXFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ -Wall @LIBZMQ_EXTRA_CXXFLAGS@ -libzmq_la_CFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ @LIBZMQ_EXTRA_CXXFLAGS@ \ - -pipe \ - -Wall \ - -Wextra \ - -Wfloat-equal \ - -Wshadow \ - -Wunsafe-loop-optimizations \ - -Wpointer-arith \ - -Wbad-function-cast \ - -Wcast-qual \ - -Wcast-align \ - -Wwrite-strings \ - -Waggregate-return \ - -Wstrict-prototypes \ - -Wold-style-definition \ - -Wmissing-prototypes \ - -Wmissing-declarations \ - -Wmissing-noreturn \ - -Wmissing-format-attribute \ - -Wredundant-decls \ - -Wnested-externs \ - -Winline \ - -pedantic \ - -std=gnu99 \ - --param max-inline-insns-single=600 \ - -D_REENTRANT \ - -D_GNU_SOURCE \ - -D__need_IOV_MAX \ - -DCONFIG_HAVE_EPOLL \ - -DCONFIG_HAVE_RTC \ - -DCONFIG_HAVE_TSC \ - -DCONFIG_HAVE_IFR_NETMASK \ - -DCONFIG_HAVE_GETIFADDRS \ - -DCONFIG_HAVE_GETHOSTBYNAME2 \ - -DCONFIG_HAVE_GETPROTOBYNAME_R \ - -DCONFIG_HAVE_SIGHANDLER_T \ - -DCONFIG_BIND_INADDR_ANY \ - -DCONFIG_GALOIS_MUL_LUT -endif - if BUILD_PGM2 libzmq_la_CXXFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ -Wall @LIBZMQ_EXTRA_CXXFLAGS@ diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index b71becc..5de98b7 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -165,13 +165,8 @@ void zmq::pgm_receiver_t::in_event () peer_info_t peer_info = {false, NULL}; it = peers.insert (std::make_pair (*tsi, peer_info)).first; -#ifdef ZMQ_HAVE_OPENPGM1 - zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_print_tsi (tsi), - __FILE__, __LINE__); -#elif ZMQ_HAVE_OPENPGM2 zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_tsi_print (tsi), __FILE__, __LINE__); -#endif } // There is not beginning of the message in current APDU and we @@ -197,13 +192,8 @@ void zmq::pgm_receiver_t::in_event () it->second.decoder = new zmq_decoder_t (0); it->second.decoder->set_inout (inout); -#ifdef ZMQ_HAVE_OPENPGM1 zmq_log (1, "Peer %s joined into the stream, %s(%i)\n", - pgm_print_tsi (tsi), __FILE__, __LINE__); -#elif ZMQ_HAVE_OPENPGM2 - zmq_log (1, "Peer %s joined into the stream, %s(%i)\n", pgm_tsi_print (tsi), __FILE__, __LINE__); -#endif } if (nbytes > 0) { diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 964e00b..7686286 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -74,19 +74,13 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) // Alocate 2 fds for PGM socket. int downlink_socket_fd = 0; int uplink_socket_fd = 0; -#ifdef ZMQ_HAVE_OPENPGM2 int rdata_notify_fd = 0; -#endif encoder.set_inout (inout_); // Fill fds from PGM transport. -#ifdef ZMQ_HAVE_OPENPGM1 - pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd); -#elif ZMQ_HAVE_OPENPGM2 pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, &rdata_notify_fd); -#endif // Add downlink_socket_fd into poller. handle = add_fd (downlink_socket_fd); @@ -95,16 +89,12 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) uplink_handle = add_fd (uplink_socket_fd); // Add rdata_notify_fd into the poller. -#ifdef ZMQ_HAVE_OPENPGM2 rdata_notify_handle = add_fd (rdata_notify_fd); -#endif // Set POLLIN. We wont never want to stop polling for uplink = we never // want to stop porocess NAKs. set_pollin (uplink_handle); -#ifdef ZMQ_HAVE_OPENPGM2 set_pollin (rdata_notify_handle); -#endif // Set POLLOUT for downlink_socket_handle. set_pollout (handle); @@ -116,9 +106,7 @@ void zmq::pgm_sender_t::unplug () { rm_fd (handle); rm_fd (uplink_handle); -#ifdef ZMQ_HAVE_OPENPGM2 rm_fd (rdata_notify_handle); -#endif encoder.set_inout (NULL); inout = NULL; } @@ -195,12 +183,6 @@ void zmq::pgm_sender_t::out_event () zmq_log (1, "pgm rate limit reached, %s(%i)\n", __FILE__, __LINE__); } -#ifdef ZMQ_HAVE_OPENPGM1 - // After sending data slice is owned by tx window. - if (nbytes) { - out_buffer = NULL; - } -#endif write_pos += nbytes; } diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 1cebaa8..9a9844c 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -80,9 +80,7 @@ namespace zmq // Poll handle associated with PGM socket. handle_t handle; handle_t uplink_handle; -#ifdef ZMQ_HAVE_OPENPGM2 handle_t rdata_notify_handle; -#endif // Parent session. i_inout *inout; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 8400b83..60712f4 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -127,13 +127,9 @@ int zmq::pgm_socket_t::open_transport (void) nbytes_processed = 0; pgm_msgv_processed = 0; -#ifdef ZMQ_HAVE_OPENPGM1 - int pgm_ok = 0; -#elif defined ZMQ_HAVE_OPENPGM2 + // TODO: Converting bool to int? Not nice. int pgm_ok = true; - GError *pgm_error = NULL; -#endif // Init PGM transport. // Ensure threading enabled, ensure timer enabled and find PGM protocol id. @@ -162,11 +158,7 @@ int zmq::pgm_socket_t::open_transport (void) gsi_base = uuid_t ().to_string (); } -#ifdef ZMQ_HAVE_OPENPGM1 - rc = pgm_create_custom_gsi (gsi_base.c_str (), &gsi); -#elif defined ZMQ_HAVE_OPENPGM2 - rc = pgm_gsi_create_from_string (&gsi, gsi_base.c_str (), -1); -#endif + rc = pgm_gsi_create_from_string (&gsi, gsi_base.c_str (), -1); if (rc != pgm_ok) { errno = EINVAL; @@ -176,26 +168,6 @@ int zmq::pgm_socket_t::open_transport (void) //zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi), // __FILE__, __LINE__); -#ifdef ZMQ_HAVE_OPENPGM1 - // PGM transport GSRs. - struct group_source_req recv_gsr, send_gsr; - size_t recv_gsr_len = 1; - - // On success, 0 is returned. On invalid arguments, -EINVAL is returned. - // If more multicast groups are found than the recv_len parameter, - // -ENOMEM is returned. - rc = pgm_if_parse_transport (network, AF_INET, &recv_gsr, - &recv_gsr_len, &send_gsr); - if (rc != 0) { - errno = EINVAL; - return -1; - } - - if (recv_gsr_len != 1) { - errno = ENOMEM; - return -1; - } -#elif defined ZMQ_HAVE_OPENPGM2 struct pgm_transport_info_t *res = NULL; if (!pgm_if_get_transport_info (network, NULL, &res, &pgm_error)) { @@ -205,29 +177,13 @@ int zmq::pgm_socket_t::open_transport (void) res->ti_gsi = gsi; res->ti_dport = port_number; -#endif // If we are using UDP encapsulation update gsr or res. if (udp_encapsulation) { -#ifdef ZMQ_HAVE_OPENPGM1 - // Use the same port for UDP encapsulation. - ((struct sockaddr_in*)&send_gsr.gsr_group)->sin_port = - g_htons (port_number); - ((struct sockaddr_in*)&recv_gsr.gsr_group)->sin_port = - g_htons (port_number); -#elif defined ZMQ_HAVE_OPENPGM2 res->ti_udp_encap_ucast_port = port_number; res->ti_udp_encap_mcast_port = port_number; -#endif } -#ifdef ZMQ_HAVE_OPENPGM1 - rc = pgm_transport_create (&transport, &gsi, 0, port_number, &recv_gsr, - 1, &send_gsr); - if (rc != 0) { - return -1; - } -#elif defined ZMQ_HAVE_OPENPGM2 if (!pgm_transport_create (&transport, res, &pgm_error)) { pgm_if_free_transport_info (res); // TODO: tranlate errors from glib into errnos. @@ -236,7 +192,6 @@ int zmq::pgm_socket_t::open_transport (void) } pgm_if_free_transport_info (res); -#endif zmq_log (1, "PGM transport created, %s(%i)\n", __FILE__, __LINE__); @@ -270,11 +225,8 @@ int zmq::pgm_socket_t::open_transport (void) // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. -#if defined ZMQ_HAVE_OPENPGM1 - rc = pgm_transport_set_recv_only (transport, false); -#elif defined ZMQ_HAVE_OPENPGM2 rc = pgm_transport_set_recv_only (transport, true, false); -#endif + zmq_assert (rc == pgm_ok); // Set NAK transmit back-off interval [us]. @@ -361,23 +313,6 @@ int zmq::pgm_socket_t::open_transport (void) return -1; } -#ifdef ZMQ_HAVE_OPENPGM1 - // Preallocate full transmit window. For simplification always - // worst case is used (40 bytes ipv6 header and 20 bytes UDP - // encapsulation). - int to_preallocate = options.recovery_ivl * (options.rate * 1000 / 8) - / (pgm_max_tpdu - 40 - 20); - - rc = pgm_transport_set_txw_preallocate (transport, to_preallocate); - if (rc != 0) { - errno = EINVAL; - return -1; - } - - zmq_log (2, "Preallocated %i slices in TX window. %s(%i)\n", - to_preallocate, __FILE__, __LINE__); -#endif - // Set interval of background SPM packets [us]. rc = pgm_transport_set_ambient_spm (transport, 8192 * 1000); zmq_assert (rc == pgm_ok); @@ -398,17 +333,10 @@ int zmq::pgm_socket_t::open_transport (void) } // Bind a transport to the specified network devices. -#ifdef ZMQ_HAVE_OPENPGM1 - rc = pgm_transport_bind (transport); - if (rc != 0) { - return -1; - } -#elif defined ZMQ_HAVE_OPENPGM2 - if (!pgm_transport_bind (transport, &pgm_error)) { + if (!pgm_transport_bind (transport, &pgm_error)) { // TODO: tranlate errors from glib into errnos. return -1; - } -#endif + } zmq_log (1, "PGM transport bound, %s(%i)\n", __FILE__, __LINE__); @@ -444,28 +372,6 @@ int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, zmq_assert (receive_fd_); zmq_assert (waiting_pipe_fd_); -#if defined ZMQ_HAVE_OPENPGM1 - // For POLLIN there are 2 pollfds in pgm_transport. - int fds_array_size = pgm_receiver_fd_count; - pollfd *fds = new pollfd [fds_array_size]; - memset (fds, '\0', fds_array_size * sizeof (fds)); - - // Retrieve pollfds from pgm_transport. - int rc = pgm_transport_poll_info (transport, fds, &fds_array_size, - POLLIN); - - // pgm_transport_poll_info has to return 2 pollfds for POLLIN. - // Note that fds_array_size parameter can be - // changed inside pgm_transport_poll_info call. - zmq_assert (rc == pgm_receiver_fd_count); - - // Store pfds into user allocated space. - *receive_fd_ = fds [0].fd; - *waiting_pipe_fd_ = fds [1].fd; - - delete [] fds; - -#elif defined ZMQ_HAVE_OPENPGM2 // recv_sock2 should not be used - check it. zmq_assert (transport->recv_sock2 == -1); @@ -476,7 +382,6 @@ int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, // Take FDs directly from transport. *receive_fd_ = pgm_transport_get_recv_fd (transport); *waiting_pipe_fd_ = pgm_transport_get_pending_fd (transport); -#endif return pgm_receiver_fd_count; } @@ -491,31 +396,6 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, zmq_assert (send_fd_); zmq_assert (receive_fd_); -#if defined ZMQ_HAVE_OPENPGM1 - zmq_assert (!rdata_notify_fd_); - - // Preallocate pollfds array. - int fds_array_size = pgm_sender_fd_count; - pollfd *fds = new pollfd [fds_array_size]; - memset (fds, '\0', fds_array_size * sizeof (fds)); - - // Retrieve pollfds from pgm_transport. - int rc = pgm_transport_poll_info (transport, fds, &fds_array_size, - POLLOUT | POLLIN); - - // pgm_transport_poll_info has to return one pollfds for POLLOUT and - // second for POLLIN. - // Note that fds_array_size parameter can be - // changed inside pgm_transport_poll_info call. - zmq_assert (rc == pgm_sender_fd_count); - - // Store pfds into user allocated space. - *receive_fd_ = fds [0].fd; - *send_fd_ = fds [1].fd; - - delete [] fds; - -#elif defined ZMQ_HAVE_OPENPGM2 zmq_assert (rdata_notify_fd_); // recv_sock2 should not be used - check it. @@ -529,7 +409,6 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, *receive_fd_ = pgm_transport_get_recv_fd (transport); *rdata_notify_fd_ = pgm_transport_get_repair_fd (transport); *send_fd_ = pgm_transport_get_send_fd (transport); -#endif return pgm_sender_fd_count; } @@ -537,27 +416,6 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, // Send one APDU, transmit window owned memory. size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) { - -#if defined ZMQ_HAVE_OPENPGM1 - - ssize_t nbytes = 0; - iovec iov = {data_,data_len_}; - - nbytes = pgm_transport_send_packetv (transport, &iov, 1, - MSG_DONTWAIT | MSG_WAITALL, true); - - zmq_assert (nbytes != -EINVAL); - - if (nbytes == -1 && errno != EAGAIN) { - errno_assert (false); - } - - // If nbytes is -1 and errno is EAGAIN means that we can not send data - // now. We have to call write_one_pkt again. - nbytes = nbytes == -1 ? 0 : nbytes; - -#elif defined ZMQ_HAVE_OPENPGM2 - size_t nbytes = 0; PGMIOStatus status = pgm_send (transport, data_, data_len_, &nbytes); @@ -569,7 +427,6 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED); zmq_assert (nbytes == 0); } -#endif zmq_log (4, "wrote %i/%iB, %s(%i)\n", (int) nbytes, (int) data_len_, __FILE__, __LINE__); @@ -618,26 +475,17 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_) // Store size. *size_ = get_max_tsdu_size (); -#if defined ZMQ_HAVE_OPENPGM1 - // Allocate one packet in tx window. - return pgm_packetv_alloc (transport, false); -#elif defined ZMQ_HAVE_OPENPGM2 // Allocate buffer. unsigned char *apdu_buff = new unsigned char [*size_]; zmq_assert (apdu_buff); return apdu_buff; -#endif } // Return an unused packet allocated from the transmit window // via pgm_packetv_alloc(). void zmq::pgm_socket_t::free_buffer (void *data_) { -#if defined ZMQ_HAVE_OPENPGM1 - pgm_packetv_free1 (transport, data_, false); -#elif defined ZMQ_HAVE_OPENPGM2 delete [] (unsigned char*) data_; -#endif } @@ -672,42 +520,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Receive a vector of Application Protocol Domain Unit's (APDUs) // from the transport. -#ifdef ZMQ_HAVE_OPENPGM1 - nbytes_rec = pgm_transport_recvmsgv (transport, pgm_msgv, - pgm_msgv_len, MSG_DONTWAIT); - - // In a case when no ODATA/RDATA fired POLLIN event (SPM...) - // pgm_transport_recvmsg returns -1 with errno == EAGAIN. - if (nbytes_rec == -1 && errno == EAGAIN) { - - // In case if no RDATA/ODATA caused POLLIN 0 is - // returned. - nbytes_rec = 0; - return 0; - } - - // For data loss nbytes_rec == -1 errno == ECONNRESET. - if (nbytes_rec == -1 && errno == ECONNRESET) { - - // Save lost data TSI. - *tsi_ = &transport->lost_data_tsi; - - zmq_log (1, "Data loss detected %s, %s(%i)\n", - pgm_print_tsi (&transport->lost_data_tsi), __FILE__, __LINE__); - - nbytes_rec = 0; - - // In case of dala loss -1 is returned. - return -1; - } - - // Catch the rest of the errors. - if (nbytes_rec <= 0) { - zmq_log (2, "received %i B, errno %i, %s(%i).\n", (int)nbytes_rec, - errno, __FILE__, __LINE__); - errno_assert (false); - } -#elif defined ZMQ_HAVE_OPENPGM2 GError *pgm_error = NULL; const PGMIOStatus status = pgm_recvmsgv (transport, pgm_msgv, @@ -760,7 +572,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) nbytes_rec = 0; return -1; } -#endif zmq_log (4, "received %i bytes\n", (int)nbytes_rec); @@ -768,17 +579,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_assert (nbytes_rec > 0); -#ifdef ZMQ_HAVE_OPENPGM1 - // Only one APDU per pgm_msgv_t structure is allowed. - zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_iovlen == 1); - - // Take pointers from pgm_msgv_t structure. - *raw_data_ = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_base; - raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len; - - // Save current TSI. - *tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi; -#elif defined ZMQ_HAVE_OPENPGM2 // Only one APDU per pgm_msgv_t structure is allowed. zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1); @@ -791,7 +591,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Save current TSI. *tsi_ = &skb->tsi; -#endif // Move the the next pgm_msgv_t structure. pgm_msgv_processed++; @@ -808,19 +607,6 @@ void zmq::pgm_socket_t::process_upstream (void) pgm_msgv_t dummy_msg; -#ifdef ZMQ_HAVE_OPENPGM1 - ssize_t dummy_bytes = 0; - - // We acctually do not want to read any data here we are going to - // process NAK. - - dummy_bytes = pgm_transport_recvmsgv (transport, &dummy_msg, - 1, MSG_DONTWAIT); - - // No data should be returned. - zmq_assert (dummy_bytes == -1 && errno == EAGAIN); - -#elif defined ZMQ_HAVE_OPENPGM2 size_t dummy_bytes = 0; GError *pgm_error = NULL; @@ -833,8 +619,6 @@ void zmq::pgm_socket_t::process_upstream (void) // No data should be returned. zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || status == PGM_IO_STATUS_RATE_LIMITED)); -#endif - } #endif diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 6c67306..d89d19c 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -114,35 +114,19 @@ namespace zmq pgm_msgv_t *pgm_msgv; // How many bytes were read from pgm socket. -#ifdef ZMQ_HAVE_OPENPGM1 - ssize_t nbytes_rec; -#elif defined ZMQ_HAVE_OPENPGM2 size_t nbytes_rec; -#endif // How many bytes were processed from last pgm socket read. -#ifdef ZMQ_HAVE_OPENPGM1 - ssize_t nbytes_processed; -#elif defined ZMQ_HAVE_OPENPGM2 size_t nbytes_processed; -#endif // How many messages from pgm_msgv were already sent up. -#ifdef ZMQ_HAVE_OPENPGM1 - ssize_t pgm_msgv_processed; -#elif defined ZMQ_HAVE_OPENPGM2 size_t pgm_msgv_processed; -#endif // Size of pgm_msgv array. size_t pgm_msgv_len; // Sender transport uses 2 fd. -#ifdef ZMQ_HAVE_OPENPGM1 - enum {pgm_sender_fd_count = 2}; -#elif ZMQ_HAVE_OPENPGM2 enum {pgm_sender_fd_count = 3}; -#endif // Receiver transport uses 2 fd. enum {pgm_receiver_fd_count = 2}; |