diff options
| -rw-r--r-- | Makefile.am | 4 | ||||
| -rw-r--r-- | c/zmq.h | 2 | ||||
| -rw-r--r-- | src/Makefile.am | 6 | ||||
| -rw-r--r-- | src/app_thread.cpp | 2 | ||||
| -rw-r--r-- | src/config.hpp | 4 | ||||
| -rw-r--r-- | src/options.cpp | 4 | ||||
| -rw-r--r-- | src/options.hpp | 6 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 224 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 109 | ||||
| -rw-r--r-- | src/pgm_socket.cpp | 754 | ||||
| -rw-r--r-- | src/pgm_socket.hpp | 153 | ||||
| -rw-r--r-- | src/socket_base.cpp | 105 | ||||
| -rw-r--r-- | src/socket_base.hpp | 5 | ||||
| -rw-r--r-- | src/sub.cpp | 2 | 
14 files changed, 1360 insertions, 20 deletions
| diff --git a/Makefile.am b/Makefile.am index 335d988..18636c0 100644 --- a/Makefile.am +++ b/Makefile.am @@ -17,5 +17,7 @@ endif  SUBDIRS = src $(DIR_P) $(DIR_R) $(DIR_J) $(DIR_PERF)  DIST_SUBDIRS = src python ruby java perf +EXTRA_DIST =  $(top_srcdir)/foreign/openpgm/@pgm_basename@.tar.bz2 +  dist-hook: -		-rm -rf $(distdir)/third-party/openpgm/$pgm_basename +		-rm -rf $(distdir)/foreign/openpgm/@pgm_basename@ @@ -52,6 +52,8 @@ extern "C" {  #define ZMQ_IDENTITY 6  #define ZMQ_SUBSCRIBE 7  #define ZMQ_UNSUBSCRIBE 8 +#define ZMQ_RATE 9 +#define ZMQ_RECOVERY_IVL 10  //  The operation should be performed in non-blocking mode. I.e. if it cannot  //  be processed immediately, error should be returned with errno set to EAGAIN. diff --git a/src/Makefile.am b/src/Makefile.am index a09c001..ce88b26 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -62,6 +62,8 @@ libzmq_la_SOURCES = $(pgm_sources) \      object.hpp \      options.hpp \      owned.hpp \ +    pgm_sender.hpp \ +    pgm_socket.hpp \      pipe.hpp \      platform.hpp \      poll.hpp \ @@ -102,6 +104,8 @@ libzmq_la_SOURCES = $(pgm_sources) \      object.cpp \      options.cpp \      owned.cpp \ +    pgm_sender.cpp \ +    pgm_socket.cpp \      pipe.cpp \      poll.cpp \      pub.cpp \ @@ -124,7 +128,7 @@ libzmq_la_SOURCES = $(pgm_sources) \      zmq_listener.cpp \      zmq_listener_init.cpp -libzmq_la_LDFLAGS = -version-info @LTVER@ +libzmq_la_LDFLAGS = -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAFS@  if BUILD_PGM  libzmq_la_CXXFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ -Wall @LIBZMQ_EXTRA_CXXFLAGS@ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index c48657a..feaa4d6 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -148,7 +148,7 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)      case ZMQ_P2P:      case ZMQ_REQ:      case ZMQ_REP: -        s = new socket_base_t (this); +        s = new socket_base_t (this, type_);          break;      default:          //  TODO: This should be EINVAL. diff --git a/src/config.hpp b/src/config.hpp index 17e67b9..43a4513 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -70,8 +70,10 @@ namespace zmq          //  Maximal number of non-accepted connections that can be held by          //  TCP listener object. -        tcp_connection_backlog = 10 +        tcp_connection_backlog = 10, +        //  Maximum transport data unit size for PGM (TPDU). +        pgm_max_tpdu = 1500      };  } diff --git a/src/options.cpp b/src/options.cpp index cd07c44..804cb4f 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -24,6 +24,8 @@ zmq::options_t::options_t () :      lwm (0),      swap (0),      mask (0), -    affinity (0) +    affinity (0), +    rate (0), +    recovery_ivl (0)  {  } diff --git a/src/options.hpp b/src/options.hpp index faf21b8..9f4a264 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -37,6 +37,12 @@ namespace zmq          uint64_t mask;          uint64_t affinity;          std::string identity; + +        //  Maximum tranfer rate [kb/s]. +        uint32_t rate; + +        //  Reliability time interval [s]. +        uint32_t recovery_ivl;      };  } diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp new file mode 100644 index 0000000..60b4c3a --- /dev/null +++ b/src/pgm_sender.cpp @@ -0,0 +1,224 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_OPENPGM + +#include <iostream> + +#include "io_thread.hpp" +#include "pgm_sender.hpp" +#include "err.hpp" +#include "wire.hpp" + +//#define PGM_SENDER_DEBUG +//#define PGM_SENDER_DEBUG_LEVEL 1 + +// level 1 = key behaviour +// level 2 = processing flow +// level 4 = infos + +#ifndef PGM_SENDER_DEBUG +#   define zmq_log(n, ...)  while (0) +#else +#   define zmq_log(n, ...)    do { if ((n) <= PGM_SENDER_DEBUG_LEVEL) \ +        { printf (__VA_ARGS__);}} while (0) +#endif + +#ifdef ZMQ_HAVE_LINUX +zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,  +      const options_t &options_, const char *session_name_) : +    io_object_t (parent_), +    pgm_socket (false, options_), +    options (options_), +    session_name (session_name_), +    inout (NULL), +    out_buffer (NULL), +    out_buffer_size (0), +    write_size (0), +    write_pos (0),  +    first_message_offset (-1) +{ + +} + +int zmq::pgm_sender_t::init (const char *network_) +{ +    return pgm_socket.init (network_); +} + +void zmq::pgm_sender_t::plug (i_inout *inout_) +{ +     +    //  Alocate 2 fds for PGM socket. +    int downlink_socket_fd; +    int uplink_socket_fd; + +    encoder.set_inout (inout_); + +    //  Fill fds from PGM transport. +    pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd); + +    //  Add downlink_socket_fd into poller. +    handle = add_fd (downlink_socket_fd); + +    //  Add uplink_socket_fd into the poller. +    uplink_handle = add_fd (uplink_socket_fd); + +    //  Set POLLIN. We wont never want to stop polling for uplink = we never +    //  want to stop porocess NAKs. +    set_pollin (uplink_handle); + +    //  Set POLLOUT for downlink_socket_handle. +    set_pollout (handle); + +    inout = inout_; + +    zmq_log (1, "plug: downlink_socket_fd %i, uplink_socket_fd %i, %s(%i)", +        downlink_socket_fd, uplink_socket_fd, __FILE__, __LINE__); + +    std::cout << std::flush; +} + +void zmq::pgm_sender_t::unplug () +{ +    rm_fd (handle); +    rm_fd (uplink_handle); +    encoder.set_inout (NULL); +    inout = NULL; +} + +void zmq::pgm_sender_t::revive () +{ +    set_pollout (handle); +} + +zmq::pgm_sender_t::~pgm_sender_t () +{ +    if (out_buffer) { +        pgm_socket.free_buffer (out_buffer); +    } +} + +//  In event on sender side means NAK or SPMR receiving from some peer. +void zmq::pgm_sender_t::in_event () +{ +    pgm_socket.process_upstream (); +} + +void zmq::pgm_sender_t::out_event () +{ + +    //  POLLOUT event from send socket. If write buffer is empty,  +    //  try to read new data from the encoder. +    if (write_pos == write_size) { + +        //  Get buffer if we do not have already one. +        if (!out_buffer) { +            out_buffer = (unsigned char*)  +                pgm_socket.get_buffer (&out_buffer_size); +        } + +        assert (out_buffer_size > 0); + +        //  First two bytes /sizeof (uint16_t)/ are used to store message  +        //  offset in following steps. +        write_size = encoder.read (out_buffer + sizeof (uint16_t),  +            out_buffer_size - sizeof (uint16_t), &first_message_offset); +        write_pos = 0; + +        //  If there are no data to write stop polling for output. +        if (!write_size) { +            reset_pollout (handle); +        } else { +            // Addning uint16_t for offset in a case when encoder returned > 0B. +            write_size += sizeof (uint16_t); +        } +    } + +    //  If there are any data to write, write them into the socket. +    //  Note that all data has to written in one write_one_pkt_with_offset call. +    if (write_pos < write_size) { +        size_t nbytes = write_one_pkt_with_offset (out_buffer + write_pos,  +            write_size - write_pos, (uint16_t) first_message_offset); + +        //  We can write all data or 0 which means rate limit reached. +        if (write_size - write_pos != nbytes && nbytes != 0) { +            zmq_log (1, "write_size - write_pos %i, nbytes %i, %s(%i)", +                (int)(write_size - write_pos), (int)nbytes, __FILE__, __LINE__); +            assert (false); +        } + +        //  PGM rate limit reached nbytes is 0. +        if (!nbytes) { +            zmq_log (1, "pgm rate limit reached, %s(%i)\n", __FILE__, __LINE__); +        } + +        //  After sending data slice is owned by tx window. +        if (nbytes) { +            out_buffer = NULL; +        } + +        write_pos += nbytes; +    } + +} + +/* +void zmq::bp_pgm_sender_t::revive (pipe_t *pipe_) +{ +    //  We have some messages in encoder. +    if (!shutting_down) { +                 +        //  Forward the revive command to the pipe.  +        engine_base_t <false, true>::revive (pipe_); + +        //  There is at least one engine (that one which sent revive) that  +        //  has messages ready. Try to write data to the socket, thus  +        //  eliminating one polling for POLLOUT event. +        //  Note that if write_size is zero it means that buffer is empty and +        //  we can read data from encoder. +        if (!write_size) { +            poller->set_pollout (handle); +            out_event (handle); +        } +    } +} + +*/ +size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_,  +    size_t size_, uint16_t offset_) +{ +    zmq_log (1, "data_size %i, first message offset %i, %s(%i)", +        (int) size_, offset_, __FILE__, __LINE__); + +    std::cout << std::flush; + +    //  Put offset information in the buffer. +    put_uint16 (data_, offset_); +    +    //  Send data. +    size_t nbytes = pgm_socket.send (data_, size_); + +    return nbytes; +} +#endif + +#endif diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp new file mode 100644 index 0000000..2ad2a15 --- /dev/null +++ b/src/pgm_sender.hpp @@ -0,0 +1,109 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_BP_PGM_SENDER_HPP_INCLUDED__ +#define __ZMQ_BP_PGM_SENDER_HPP_INCLUDED__ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_OPENPGM + +#include <vector> + +#include "stdint.hpp" +#include "io_object.hpp" +#include "i_engine.hpp" +#include "options.hpp" +#include "pgm_socket.hpp" +#include "zmq_encoder.hpp" + +namespace zmq +{ + +    class pgm_sender_t : public io_object_t, public i_engine +    { + +    public: +        pgm_sender_t (class io_thread_t *parent_, const options_t &options_,  +            const char *session_name_); +        ~pgm_sender_t (); + +        int init (const char *network_); + +        //  i_engine interface implementation. +        void plug (struct i_inout *inout_); +        void unplug (); +        void revive (); + +        //  i_poll_events interface implementation. +        void in_event (); +        void out_event (); + +    private: + +        //  Send one APDU with first message offset information.  +        //  Note that first 2 bytes in data_ are used to store the offset_ +        //  and thus user data has to start at data_ + sizeof (uint16_t). +        size_t write_one_pkt_with_offset (unsigned char *data_, size_t size_, +            uint16_t offset_); + +        //  Message encoder. +        zmq_encoder_t encoder; + +        //  PGM socket. +        pgm_socket_t pgm_socket; + +        //  Socket options. +        options_t options; + +        //  Name of the session associated with the connecter. +        std::string session_name; + +        //  Poll handle associated with PGM socket. +        handle_t handle; +        handle_t uplink_handle; + +        //  ? +        i_inout *inout; + +        //  Output buffer from pgm_socket. +#ifdef ZMQ_HAVE_WINDOWS +        unsigned char out_buffer [pgm_win_max_apdu]; +#else +        unsigned char *out_buffer; +         +        //  Output buffer size. +        size_t out_buffer_size; +#endif + +        size_t write_size; +        size_t write_pos; + +        //  Offset of the first mesage in data chunk taken from encoder. +        int first_message_offset; + +        pgm_sender_t (const pgm_sender_t&); +        void operator = (const pgm_sender_t&); +    }; + +} + +#endif + +#endif diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp new file mode 100644 index 0000000..5e2e764 --- /dev/null +++ b/src/pgm_socket.cpp @@ -0,0 +1,754 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_OPENPGM + +#ifdef ZMQ_HAVE_LINUX +#include <pgm/pgm.h> +#else  +#include <Winsock2.h> +#include <Wsrm.h> +#include <ws2spi.h> +#endif + +#include <string> +#include <iostream> + +#include "options.hpp" +#include "pgm_socket.hpp" +#include "config.hpp" +#include "err.hpp" + +//#define PGM_SOCKET_DEBUG +//#define PGM_SOCKET_DEBUG_LEVEL 1 + +// level 1 = key behaviour +// level 2 = processing flow +// level 4 = infos + +#ifndef PGM_SOCKET_DEBUG +#   define zmq_log(n, ...)  while (0) +#else +#   define zmq_log(n, ...)    do { if ((n) <= PGM_SOCKET_DEBUG_LEVEL) \ +        { printf (__VA_ARGS__);}} while (0) +#endif + +#ifdef ZMQ_HAVE_LINUX + +zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : +    g_transport (NULL), +    options (options_), +    receiver (receiver_), +    port_number (0), +    udp_encapsulation (false), +    pgm_msgv (NULL), +    nbytes_rec (0), +    nbytes_processed (0), +    pgm_msgv_processed (0), +    pgm_msgv_len (0) +{ +     +} + +int zmq::pgm_socket_t::init (const char *network_) +{ +    //  Check if we are encapsulating into UDP, natwork string has to  +    //  start with udp:. +    const char *network_ptr = network_; + +    if (strlen (network_) >= 4 && network_ [0] == 'u' &&  +          network_ [1] == 'd' && network_ [2] == 'p' &&  +          network_ [3] == ':') { +     +        //  Shift interface_ptr after ':'. +        network_ptr += 4; + +        udp_encapsulation = true; +    } +  +    //  Parse port number. +    const char *port_delim = strchr (network_ptr, ':'); +    if (!port_delim) { +        errno = EINVAL; +        return -1; +    } + +    port_number = atoi (port_delim + 1); +   +    //  Store interface string. +    if (port_delim <= network_ptr) { +        errno = EINVAL; +        return -1; +    } + +    if (port_delim - network_ptr >= (int) sizeof (network) - 1) { +        errno = EINVAL; +        return -1; +    } + +    memset (network, '\0', sizeof (network)); +    memcpy (network, network_ptr, port_delim - network_ptr); + + +    zmq_log (1, "parsed: network  %s, port %i, udp encaps. %s, %s(%i)\n",  +        network, port_number, udp_encapsulation ? "yes" : "no", +        __FILE__, __LINE__); + +    //  Open PGM transport. +    int rc = open_transport (); +    if (rc != 0) +        return -1; + +    //  For receiver transport preallocate pgm_msgv array. +    //  in_batch_size configured in confing.hpp +    if (receiver) { +        pgm_msgv_len = get_max_apdu_at_once (in_batch_size); +        pgm_msgv = new pgm_msgv_t [pgm_msgv_len]; +    } + +    return 0; +} + +int zmq::pgm_socket_t::open_transport (void) +{ + +    zmq_log (1, "Opening PGM: network  %s, port %i, udp encaps. %s, %s(%i)\n", +        network, port_number, udp_encapsulation ? "yes" : "no",  +        __FILE__, __LINE__); + +    //  Can not open transport before destroying old one.  +    zmq_assert (g_transport == NULL); + +    //  Set actual_tsi and prev_tsi to zeros. +    memset (&tsi, '\0', sizeof (pgm_tsi_t)); +    memset (&retired_tsi, '\0', sizeof (pgm_tsi_t)); + +    //  Zero counter used in msgrecv. +    nbytes_rec = 0; +    nbytes_processed = 0; +    pgm_msgv_processed = 0; + +    //  Init PGM transport. +    //  Ensure threading enabled, ensure timer enabled and find PGM protocol id. +    // +    //  Note that if you want to use gettimeofday and sleep for openPGM timing, +    //  set environment variables PGM_TIMER to "GTOD"  +    //  and PGM_SLEEP to "USLEEP". +    int rc = pgm_init (); +    if (rc != 0) { +        errno = EINVAL; +        return -1; +    } + +    //  PGM transport GSI. +    pgm_gsi_t gsi; +  +    //  PGM transport GSRs. +    struct group_source_req recv_gsr, send_gsr; +    size_t recv_gsr_len = 1; + +    rc = pgm_create_md5_gsi (&gsi); +    if (rc != 0) { +        errno = EINVAL; +        return -1; +    } + +    //  On success, 0 is returned. On invalid arguments, -EINVAL is returned.  +    //  If more multicast groups are found than the recv_len parameter,  +    //  -ENOMEM is returned. +    rc = pgm_if_parse_transport (network, AF_INET, &recv_gsr,  +        &recv_gsr_len, &send_gsr); +    if (rc != 0) { +        errno = EINVAL; +        return -1; +    } + +    if (recv_gsr_len != 1) { +        errno = ENOMEM; +        return -1; +    } + +    //  If we are using UDP encapsulation update send_gsr & recv_gsr  +    //  structures. Note that send_gsr & recv_gsr has to be updated after  +    //  pgm_if_parse_transport call. +    if (udp_encapsulation) { + +        //  Use the same port for UDP encapsulation. +        ((struct sockaddr_in*)&send_gsr.gsr_group)->sin_port =  +            g_htons (port_number); +	((struct sockaddr_in*)&recv_gsr.gsr_group)->sin_port =  +            g_htons (port_number); +    } + +    rc = pgm_transport_create (&g_transport, &gsi, 0, port_number, &recv_gsr,  +        1, &send_gsr); +    if (rc != 0) { +        return -1; +    } + +    //  Common parameters for receiver and sender. + +    //  Set maximum transport protocol data unit size (TPDU). +    rc = pgm_transport_set_max_tpdu (g_transport, pgm_max_tpdu); +    if (rc != 0) { +        errno = EINVAL; +        return -1; +    } + +    //  Set maximum number of network hops to cross. +    rc = pgm_transport_set_hops (g_transport, 16); +    if (rc != 0) { +        errno = EINVAL; +        return -1; +    } + +    //  Receiver transport. +    if (receiver) { + +        //  Set transport->may_close_on_failure to true, +        //  after data los recvmsgv returns -1 errno set to ECONNRESET. +        rc = pgm_transport_set_close_on_failure (g_transport, TRUE); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set transport->can_send_data = FALSE. +        //  Note that NAKs are still generated by the transport. +        rc = pgm_transport_set_recv_only (g_transport, false); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set NAK transmit back-off interval [us]. +        rc = pgm_transport_set_nak_bo_ivl (g_transport, 50*1000); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set timeout before repeating NAK [us]. +        rc = pgm_transport_set_nak_rpt_ivl (g_transport, 200*1000); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set timeout for receiving RDATA. +        rc = pgm_transport_set_nak_rdata_ivl (g_transport, 200*1000); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES). +        rc = pgm_transport_set_nak_data_retries (g_transport, 5); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set retries for NCF after NAK (NAK_NCF_RETRIES). +        rc = pgm_transport_set_nak_ncf_retries (g_transport, 2); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set timeout for removing a dead peer [us]. +        rc = pgm_transport_set_peer_expiry (g_transport, 5*8192*1000); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set expiration time of SPM Requests [us]. +        rc = pgm_transport_set_spmr_expiry (g_transport, 25*1000); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set the size of the receive window. +        // +        //  data rate [B/s]  (options.rate is kb/s). +        if (options.rate <= 0) { +            errno = EINVAL; +            return -1; +        } + +        rc = pgm_transport_set_rxw_max_rte (g_transport,  +            options.rate * 1000 / 8); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Recovery interval [s].  +        if (options.recovery_ivl <= 0) { +            errno = EINVAL; +            return -1; +        } + +        rc = pgm_transport_set_rxw_secs (g_transport, options.recovery_ivl); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +    //  Sender transport. +    } else { + +        //  Set transport->can_recv = FALSE, waiting_pipe wont not be read. +        rc = pgm_transport_set_send_only (g_transport, TRUE); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set the size of the send window. +        // +        //  data rate [B/s]  (options.rate is kb/s). +        if (options.rate <= 0) { +            errno = EINVAL; +            return -1; +        } + +        rc = pgm_transport_set_txw_max_rte (g_transport,  +            options.rate * 1000 / 8); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Recovery interval [s].  +        if (options.recovery_ivl <= 0) { +            errno = EINVAL; +            return -1; +        } + +        rc = pgm_transport_set_txw_secs (g_transport, options.recovery_ivl); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Preallocate full transmit window. For simplification always  +        //  worst case is used (40 bytes ipv6 header and 20 bytes UDP  +        //  encapsulation). +        int to_preallocate = options.recovery_ivl * (options.rate * 1000 / 8)  +            / (pgm_max_tpdu - 40 - 20); + +        rc = pgm_transport_set_txw_preallocate (g_transport, to_preallocate); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        zmq_log (1, "Preallocated %i slices in TX window. %s(%i)\n",  +            to_preallocate, __FILE__, __LINE__); + +        //  Set interval of background SPM packets [us]. +        rc = pgm_transport_set_ambient_spm (g_transport, 8192 * 1000); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } + +        //  Set intervals of data flushing SPM packets [us]. +        guint spm_heartbeat[] = {4 * 1000, 4 * 1000, 8 * 1000, 16 * 1000,  +            32 * 1000, 64 * 1000, 128 * 1000, 256 * 1000, 512 * 1000,  +            1024 * 1000, 2048 * 1000, 4096 * 1000, 8192 * 1000}; +         +	rc = pgm_transport_set_heartbeat_spm (g_transport, spm_heartbeat,  +            G_N_ELEMENTS(spm_heartbeat)); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        } +    } + +    //  Enable multicast loopback. +    rc = pgm_transport_set_multicast_loop (g_transport, true); +    if (rc != 0) { +        errno = EINVAL; +        return -1; +    } + +    //  Bind a transport to the specified network devices. +    rc = pgm_transport_bind (g_transport); +    if (rc != 0) { +        return -1; +    } + +    return 0; +} + +zmq::pgm_socket_t::~pgm_socket_t () +{ +    //  Celanup. +    if (pgm_msgv) { +        delete [] pgm_msgv; +    } + +    if (g_transport) +        close_transport (); +} + +void zmq::pgm_socket_t::close_transport (void) +{    +    //  g_transport has to be valid. +    zmq_assert (g_transport); + +    pgm_transport_destroy (g_transport, TRUE); + +    g_transport = NULL; +} + +//   Get receiver fds. recv_fd is from transport->recv_sock +//   waiting_pipe_fd is from transport->waiting_pipe [0] +int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_,  +    int *waiting_pipe_fd_) +{ + +    //  For POLLIN there are 2 pollfds in pgm_transport. +    int fds_array_size = pgm_receiver_fd_count; +    pollfd *fds = new pollfd [fds_array_size]; +    memset (fds, '\0', fds_array_size * sizeof (fds)); + +    //  Retrieve pollfds from pgm_transport. +    int rc = pgm_transport_poll_info (g_transport, fds, &fds_array_size,  +        POLLIN); + +    //  pgm_transport_poll_info has to return 2 pollfds for POLLIN.  +    //  Note that fds_array_size parameter can be  +    //  changed inside pgm_transport_poll_info call. +    zmq_assert (rc == pgm_receiver_fd_count); +  +    //  Store pfds into user allocated space. +    *recv_fd_ = fds [0].fd; +    *waiting_pipe_fd_ = fds [1].fd; + +    delete [] fds; + +    return pgm_receiver_fd_count; +} + +//   Get fds and store them into user allocated memory.  +//   sender_fd is from pgm_transport->send_sock. +//   receive_fd_ is from  transport->recv_sock. +int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_) +{ + +    //  Preallocate pollfds array. +    int fds_array_size = pgm_sender_fd_count; +    pollfd *fds = new pollfd [fds_array_size]; +    memset (fds, '\0', fds_array_size * sizeof (fds)); + +    //  Retrieve pollfds from pgm_transport +    int rc = pgm_transport_poll_info (g_transport, fds, &fds_array_size,  +        POLLOUT | POLLIN); + +    //  pgm_transport_poll_info has to return one pollfds for POLLOUT and +    //  second for POLLIN. +    //  Note that fds_array_size parameter can be  +    //  changed inside pgm_transport_poll_info call. +    zmq_assert (rc == pgm_sender_fd_count); +  +    //  Store pfds into user allocated space. +    *receive_fd_ = fds [0].fd; +    *send_fd_ = fds [1].fd; + +    delete [] fds; + +    return pgm_sender_fd_count; +} + +//  Send one APDU, transmit window owned memory. +size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) +{ +    iovec iov = {data_,data_len_}; + +    ssize_t nbytes = pgm_transport_send_packetv (g_transport, &iov, 1,  +        MSG_DONTWAIT | MSG_WAITALL, true); + +    zmq_assert (nbytes != -EINVAL); + +    if (nbytes == -1 && errno != EAGAIN) { +        errno_assert (false); +    } + +    //  If nbytes is -1 and errno is EAGAIN means that we can not send data  +    //  now. We have to call write_one_pkt again. +    nbytes = nbytes == -1 ? 0 : nbytes; + +    zmq_log (4, "wrote %iB, %s(%i)\n", (int)nbytes, __FILE__, __LINE__); +     +    // We have to write all data as one packet. +    if (nbytes > 0) { +        zmq_assert (nbytes == (ssize_t)data_len_); +    } + +    return nbytes; +} + +//  Return max TSDU size without fragmentation from current PGM transport. +size_t zmq::pgm_socket_t::get_max_tsdu_size (void) +{ +    return (size_t)pgm_transport_max_tsdu (g_transport, false); +} + +//  Returns how many APDUs are needed to fill reading buffer. +size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_) +{ +    zmq_assert (readbuf_size_ > 0); + +    //  Read max TSDU size without fragmentation. +    size_t max_tsdu_size = get_max_tsdu_size (); + +    //  Calculate number of APDUs needed to fill the reading buffer. +    size_t apdu_count = (int)readbuf_size_ / max_tsdu_size; + +    if ((int) readbuf_size_ % max_tsdu_size) +        apdu_count ++; + +    //  Have to have at least one APDU. +    zmq_assert (apdu_count); + +    return apdu_count; +} + +//  Allocate buffer for one packet from the transmit window, The memory buffer  +//  is owned by the transmit window and so must be returned to the window with  +//  content via pgm_transport_send() calls or unused with pgm_packetv_free1().  +void *zmq::pgm_socket_t::get_buffer (size_t *size_) +{ +    //  Store size. +    *size_ = get_max_tsdu_size (); + +    //  Allocate one packet. +    return pgm_packetv_alloc (g_transport, false); +} + +//  Return an unused packet allocated from the transmit window  +//  via pgm_packetv_alloc().  +void zmq::pgm_socket_t::free_buffer (void *data_) +{ +    pgm_packetv_free1 (g_transport, data_, false); +} + +//  pgm_transport_recvmsgv is called to fill the pgm_msgv array up to  +//  pgm_msgv_len. In subsequent calls data from pgm_msgv structure are  +//  returned. +ssize_t zmq::pgm_socket_t::receive (void **raw_data_) +{ +  +    //  We just sent all data from pgm_transport_recvmsgv up  +    //  and have to return 0 that another engine in this thread is scheduled. +    if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { + +        //  Reset all the counters. +        nbytes_rec = 0; +        nbytes_processed = 0; +        pgm_msgv_processed = 0; + +        return 0; +    } + +    //  If we have are going first time or if we have processed all pgm_msgv_t +    //  structure previaously read fr | 
