diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/pgm_receiver.cpp | 148 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 13 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 94 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 16 | ||||
| -rw-r--r-- | src/pgm_socket.cpp | 178 | ||||
| -rw-r--r-- | src/pgm_socket.hpp | 37 | 
6 files changed, 159 insertions, 327 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 1d4d695..0eb92d2 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -27,9 +27,6 @@  #include "windows.hpp"  #endif -#include <pgm/pgm.h> -#include <iostream> -  #include "pgm_receiver.hpp"  #include "err.hpp"  #include "stdint.hpp" @@ -58,20 +55,12 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)  void zmq::pgm_receiver_t::plug (i_inout *inout_)  { -    //  Allocate 2 fds one for socket second for waiting pipe. +    //  Retrieve PGM fds and start polling.      int socket_fd;      int waiting_pipe_fd; - -    //  Fill socket_fd and waiting_pipe_fd from PGM transport      pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd); - -    //  Add socket_fd into poller.      socket_handle = add_fd (socket_fd); - -    //  Add waiting_pipe_fd into poller.      pipe_handle = add_fd (waiting_pipe_fd); - -    //  Set POLLIN for both handlers.      set_pollin (pipe_handle);      set_pollin (socket_handle); @@ -81,15 +70,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)  void zmq::pgm_receiver_t::unplug ()  {      //  Delete decoders. -    for (peer_t::iterator it = peers.begin (); it != peers.end (); it++) { +    for (peers_t::iterator it = peers.begin (); it != peers.end (); it++) {          if (it->second.decoder != NULL)              delete it->second.decoder;      } -      peers.clear (); +    //  Stop polling.      rm_fd (socket_handle);      rm_fd (pipe_handle); +      inout = NULL;  } @@ -98,101 +88,77 @@ void zmq::pgm_receiver_t::revive ()      zmq_assert (false);  } -//  POLLIN event from socket or waiting_pipe.  void zmq::pgm_receiver_t::in_event ()  { -    //  Iterator to peers map. -    peer_t::iterator it; - -    //  Data from PGM socket. -    unsigned char *raw_data = NULL; +    // Read data from the underlying pgm_socket. +    unsigned char *data = NULL;      const pgm_tsi_t *tsi = NULL; -    ssize_t nbytes = 0; - -    do { - -        // Read data from underlying pgm_socket. -        nbytes = pgm_socket.receive ((void**) &raw_data, &tsi); - -        //  No ODATA or RDATA. -        if (!nbytes) -            break; +    ssize_t received = pgm_socket.receive ((void**) &data, &tsi); -        //  Fid TSI in peers list. -        it = peers.find (*tsi); +    //  No data to process. This may happen if the packet received is +    //  neither ODATA nor ODATA. +    if (received == 0) +        return; -        //  Data loss. -        if (nbytes == -1) { +    //  Find the peer based on its TSI. +    peers_t::iterator it = peers.find (*tsi); -            zmq_assert (it != peers.end ()); - -            //  Delete decoder and set joined to false. -            it->second.joined = false; -             -            if (it->second.decoder != NULL) { -                delete it->second.decoder; -                it->second.decoder = NULL; -            } - -            break; +    //  Data loss. Delete decoder and mark the peer as disjoint. +    if (received == -1) { +        zmq_assert (it != peers.end ()); +        it->second.joined = false; +        if (it->second.decoder != NULL) { +            delete it->second.decoder; +            it->second.decoder = NULL;          } +        return; +    } -        // Read offset of the fist message in current APDU. -        zmq_assert ((size_t) nbytes >= sizeof (uint16_t)); -        uint16_t apdu_offset = get_uint16 (raw_data); +    //  New peer. Add it to the list of know but unjoint peers. +    if (it == peers.end ()) { +        peer_info_t peer_info = {false, NULL}; +        it = peers.insert (std::make_pair (*tsi, peer_info)).first; +    } -        // Shift raw_data & decrease nbytes by the first message offset  -        // information (sizeof uint16_t). -        raw_data +=  sizeof (uint16_t); -        nbytes -= sizeof (uint16_t); +    //  Read the offset of the fist message in the current packet. +    zmq_assert ((size_t) received >= sizeof (uint16_t)); +    uint16_t offset = get_uint16 (data); +    data += sizeof (uint16_t); +    received -= sizeof (uint16_t); -        //  New peer. -        if (it == peers.end ()) { -            peer_info_t peer_info = {false, NULL}; -            it = peers.insert (std::make_pair (*tsi, peer_info)).first; -        } +    //  Join the stream if needed. +    if (!it->second.joined) { -        //  There is not beginning of the message in current APDU and we -        //  are not joined jet -> throwing data. -        if (apdu_offset == 0xFFFF && !it->second.joined) { -            break; -        } +        //  There is no beginning of the message in current packet. +        //  Ignore the data. +        if (offset == 0xffff) +            return; -        //  Now is the possibility to join the stream. -        if (!it->second.joined) { -  -            zmq_assert (apdu_offset <= nbytes); -            zmq_assert (it->second.decoder == NULL); +        zmq_assert (offset <= received); +        zmq_assert (it->second.decoder == NULL); -            //  We have to move data to the begining of the first message. -            raw_data += apdu_offset; -            nbytes -= apdu_offset; +        //  We have to move data to the begining of the first message. +        data += offset; +        received -= offset; -            // Joined the stream. -            it->second.joined = true; +        //  Mark the stream as joined. +        it->second.joined = true; -            //  Create and connect decoder for joined peer. -            it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); -            it->second.decoder->set_inout (inout); -        } - -        if (nbytes > 0) { -         -            //  Push all the data to the decoder. -            //  TODO: process_buffer may not process entire buffer! -            it->second.decoder->process_buffer (raw_data, nbytes); -        } +        //  Create and connect decoder for the peer. +        it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); +        it->second.decoder->set_inout (inout); +    } -    } while (nbytes > 0); +    if (received) { -    //  Flush any messages decoder may have produced to the dispatcher. -    inout->flush (); - -} +        //  Push all the data to the decoder. +        //  TODO: process_buffer may not process entire buffer! +        size_t processed = it->second.decoder->process_buffer (data, received); +        zmq_assert (processed == received); -void zmq::pgm_receiver_t::out_event () -{ -    zmq_assert (false); +        //  Flush any messages decoder may have produced. +        inout->flush (); +    }  }  #endif diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 91169f4..6fadbc3 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -29,7 +29,7 @@  #endif  #include <map> -#include <pgm/pgm.h> +#include <algorithm>  #include "io_object.hpp"  #include "i_engine.hpp" @@ -45,8 +45,6 @@ namespace zmq      public: -        //  Creates gm_engine. Underlying PGM connection is initialised -        //  using network_ parameter.          pgm_receiver_t (class io_thread_t *parent_, const options_t &options_);          ~pgm_receiver_t (); @@ -59,11 +57,12 @@ namespace zmq          //  i_poll_events interface implementation.          void in_event (); -        void out_event ();      private: -        //  Map to hold TSI, joined and decoder for each peer. +        //  If joined is true we are already getting messages from the peer. +        //  It it's false, we are getting data but still we haven't seen +        //  beginning of a message.          struct peer_info_t          {              bool joined; @@ -84,8 +83,8 @@ namespace zmq              }          }; -        typedef std::map <pgm_tsi_t, peer_info_t, tsi_comp> peer_t; -        peer_t peers; +        typedef std::map <pgm_tsi_t, peer_info_t, tsi_comp> peers_t; +        peers_t peers;          //  PGM socket.          pgm_socket_t pgm_socket; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 880bb09..0d87ee0 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -25,12 +25,13 @@  #include "windows.hpp"  #endif -#include <iostream> +#include <stdlib.h>  #include "io_thread.hpp"  #include "pgm_sender.hpp"  #include "err.hpp"  #include "wire.hpp" +#include "stdint.hpp"  zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,         const options_t &options_) : @@ -38,18 +39,21 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,      encoder (0, false),      pgm_socket (false, options_),      options (options_), -    inout (NULL),      out_buffer (NULL),      out_buffer_size (0), -    write_size (0), -    write_pos (0),  -    first_message_offset (-1) +    write_size (0)  {  }  int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)  { -    return pgm_socket.init (udp_encapsulation_, network_); +    int rc = pgm_socket.init (udp_encapsulation_, network_); +    if (rc != 0) +        return rc; + +    out_buffer_size = pgm_socket.get_max_tsdu_size (); +    out_buffer = (unsigned char*) malloc (out_buffer_size); +    zmq_assert (out_buffer);  }  void zmq::pgm_sender_t::plug (i_inout *inout_) @@ -61,17 +65,11 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)      encoder.set_inout (inout_); -    //  Fill fds from PGM transport. -    pgm_socket.get_sender_fds  -        (&downlink_socket_fd, &uplink_socket_fd, &rdata_notify_fd); - -    //  Add downlink_socket_fd into poller. +    //  Fill fds from PGM transport and add them to the poller. +    pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, +        &rdata_notify_fd);      handle = add_fd (downlink_socket_fd); - -    //  Add uplink_socket_fd into the poller.      uplink_handle = add_fd (uplink_socket_fd); - -    //  Add rdata_notify_fd into the poller.      rdata_notify_handle = add_fd (rdata_notify_fd);         //  Set POLLIN. We wont never want to stop polling for uplink = we never @@ -81,8 +79,6 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)      //  Set POLLOUT for downlink_socket_handle.      set_pollout (handle); - -    inout = inout_;  }  void zmq::pgm_sender_t::unplug () @@ -91,7 +87,6 @@ void zmq::pgm_sender_t::unplug ()      rm_fd (uplink_handle);      rm_fd (rdata_notify_handle);      encoder.set_inout (NULL); -    inout = NULL;  }  void zmq::pgm_sender_t::revive () @@ -103,14 +98,14 @@ void zmq::pgm_sender_t::revive ()  zmq::pgm_sender_t::~pgm_sender_t ()  {      if (out_buffer) { -        pgm_socket.free_buffer (out_buffer); +        free (out_buffer);          out_buffer = NULL;      }  } -//  In event on sender side means NAK or SPMR receiving from some peer.  void zmq::pgm_sender_t::in_event ()  { +    //  In event on sender side means NAK or SPMR receiving from some peer.      pgm_socket.process_upstream ();  } @@ -118,55 +113,36 @@ void zmq::pgm_sender_t::out_event ()  {      //  POLLOUT event from send socket. If write buffer is empty,       //  try to read new data from the encoder. -    if (write_pos == write_size) { - -        //  Get buffer if we do not have already one. -        if (!out_buffer) { -            out_buffer = (unsigned char*)  -                pgm_socket.get_buffer (&out_buffer_size); -        } +    if (write_size == 0) { -        assert (out_buffer_size > 0); - -        //  First two bytes /sizeof (uint16_t)/ are used to store message  -        //  offset in following steps. +        //  First two bytes (sizeof uint16_t) are used to store message  +        //  offset in following steps. Note that by passing our buffer to +        //  the get data function we prevent it from returning its own buffer.          unsigned char *bf = out_buffer + sizeof (uint16_t); -        write_size = out_buffer_size - sizeof (uint16_t); -        encoder.get_data (&bf, &write_size, &first_message_offset); -        write_pos = 0; +        size_t bfsz = out_buffer_size - sizeof (uint16_t); +        int offset = -1; +        encoder.get_data (&bf, &bfsz, &offset);          //  If there are no data to write stop polling for output. -        if (!write_size) { +        if (!bfsz) {              reset_pollout (handle); -        } else { -            // Addning uint16_t for offset in a case when encoder returned > 0B. -            write_size += sizeof (uint16_t); +            return;          } -    } - -    //  If there are any data to write, write them into the socket. -    //  Note that all data has to written in one write_one_pkt_with_offset call. -    if (write_pos < write_size) { -        size_t nbytes = write_one_pkt_with_offset (out_buffer + write_pos,  -            write_size - write_pos, (uint16_t) first_message_offset); - -        //  We can write either all data or 0 which means rate limit reached. -        zmq_assert (write_size - write_pos == nbytes || nbytes == 0); -        write_pos += nbytes; +        //  Put offset information in the buffer. +        write_size = bfsz + sizeof (uint16_t); +        put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);      } -} -size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_,  -    size_t size_, uint16_t offset_) -{ -    //  Put offset information in the buffer. -    put_uint16 (data_, offset_); -    -    //  Send data. -    size_t nbytes = pgm_socket.send (data_, size_); +    //  Send the data. +    size_t nbytes = pgm_socket.send (out_buffer, write_size); -    return nbytes; +    //  We can write either all data or 0 which means rate limit reached. +    if (nbytes == write_size) +        write_size = 0; +    else +        zmq_assert (nbytes == 0);  } +  #endif diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 30b545d..e280843 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -42,6 +42,7 @@ namespace zmq      {      public: +          pgm_sender_t (class io_thread_t *parent_, const options_t &options_);          ~pgm_sender_t (); @@ -58,12 +59,6 @@ namespace zmq      private: -        //  Send one APDU with first message offset information.  -        //  Note that first 2 bytes in data_ are used to store the offset_ -        //  and thus user data has to start at data_ + sizeof (uint16_t). -        size_t write_one_pkt_with_offset (unsigned char *data_, size_t size_, -            uint16_t offset_); -          //  Message encoder.          zmq_encoder_t encoder; @@ -78,20 +73,15 @@ namespace zmq          handle_t uplink_handle;          handle_t rdata_notify_handle; -        //  Parent session. -        i_inout *inout; -          //  Output buffer from pgm_socket.          unsigned char *out_buffer;          //  Output buffer size.          size_t out_buffer_size; +        //  Number of bytes in the buffer to be written to the socket. +        //  If zero, there are no data to be sent.          size_t write_size; -        size_t write_pos; - -        //  Offset of the first mesage in data chunk taken from encoder. -        int first_message_offset;          pgm_sender_t (const pgm_sender_t&);          void operator = (const pgm_sender_t&); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index c3802b4..72698cf 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -31,34 +31,32 @@  #define CONFIG_HAVE_POLL  #endif -#include <pgm/pgm.h> +#include <stdlib.h>  #include <string> -#include <iostream>  #include "options.hpp"  #include "pgm_socket.hpp"  #include "config.hpp"  #include "err.hpp"  #include "uuid.hpp" +#include "stdint.hpp"  zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :      transport (NULL),      options (options_),      receiver (receiver_), -    port_number (0), -    udp_encapsulation (false),      pgm_msgv (NULL),      nbytes_rec (0),      nbytes_processed (0),      pgm_msgv_processed (0),      pgm_msgv_len (0)  { -      }  int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)  { -    udp_encapsulation = udp_encapsulation_; +    //  Can not open transport before destroying old one.  +    zmq_assert (transport == NULL);      //  Parse port number.      const char *port_delim = strchr (network_, ':'); @@ -67,44 +65,21 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)          return -1;      } -    port_number = atoi (port_delim + 1); +    uint16_t port_number = atoi (port_delim + 1); +    char network [256];      if (port_delim - network_ >= (int) sizeof (network) - 1) {          errno = EINVAL;          return -1;      } -      memset (network, '\0', sizeof (network));      memcpy (network, network_, port_delim - network_); - -    //  Open PGM transport. -    int rc = open_transport (); -    if (rc != 0) -        return -1; - -    //  For receiver transport preallocate pgm_msgv array. -    //  in_batch_size configured in confing.hpp -    if (receiver) { -        pgm_msgv_len = get_max_apdu_at_once (in_batch_size); -        //  TODO: use malloc instead of new -        pgm_msgv = new pgm_msgv_t [pgm_msgv_len]; -    } - -    return 0; -} - -int zmq::pgm_socket_t::open_transport () -{ -    //  Can not open transport before destroying old one.  -    zmq_assert (transport == NULL); - +          //  Zero counter used in msgrecv.      nbytes_rec = 0;      nbytes_processed = 0;      pgm_msgv_processed = 0; -    //  TODO: Converting bool to int? Not nice. -    int pgm_ok = true;      GError *pgm_error = NULL;      //  Init PGM transport. @@ -135,8 +110,7 @@ int zmq::pgm_socket_t::open_transport ()      }      rc = pgm_gsi_create_from_string (&gsi, gsi_base.c_str (), -1); - -    if (rc != pgm_ok) { +    if (rc != TRUE) {          errno = EINVAL;          return -1;      } @@ -167,7 +141,7 @@ int zmq::pgm_socket_t::open_transport ()      res->ti_dport = port_number;      //  If we are using UDP encapsulation update gsr or res.  -    if (udp_encapsulation) { +    if (udp_encapsulation_) {          res->ti_udp_encap_ucast_port = port_number;          res->ti_udp_encap_mcast_port = port_number;      } @@ -192,14 +166,14 @@ int zmq::pgm_socket_t::open_transport ()      //  Set maximum transport protocol data unit size (TPDU).      rc = pgm_transport_set_max_tpdu (transport, pgm_max_tpdu); -    if (rc != pgm_ok) { +    if (rc != TRUE) {          errno = EINVAL;          return -1;      }      //  Set maximum number of network hops to cross.      rc = pgm_transport_set_hops (transport, 16); -    if (rc != pgm_ok) { +    if (rc != TRUE) {          errno = EINVAL;          return -1;      } @@ -212,46 +186,45 @@ int zmq::pgm_socket_t::open_transport ()      if (receiver) { -         //  Receiver transport. +        //  Receiver transport. -        //  Set transport->can_send_data = FALSE.          //  Note that NAKs are still generated by the transport.          rc = pgm_transport_set_recv_only (transport, true, false); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);          if (options.rcvbuf) {              rc = pgm_transport_set_rcvbuf (transport, (int) options.rcvbuf); -            if (rc != pgm_ok) +            if (rc != TRUE)                  return -1;          }          //  Set NAK transmit back-off interval [us].          rc = pgm_transport_set_nak_bo_ivl (transport, 50 * 1000); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);          //  Set timeout before repeating NAK [us].          rc = pgm_transport_set_nak_rpt_ivl (transport, 200 * 1000); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);          //  Set timeout for receiving RDATA.          rc = pgm_transport_set_nak_rdata_ivl (transport, 200 * 1000); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);          //  Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES).          rc = pgm_transport_set_nak_data_retries (transport, 5); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);          //  Set retries for NCF after NAK (NAK_NCF_RETRIES).          rc = pgm_transport_set_nak_ncf_retries (transport, 2); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);          //  Set timeout for removing a dead peer [us].          rc = pgm_transport_set_peer_expiry (transport, 5 * 8192 * 1000); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);          //  Set expiration time of SPM Requests [us].          rc = pgm_transport_set_spmr_expiry (transport, 25 * 1000); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);          //  Set the size of the receive window.          //  Data rate is in [B/s]. options.rate is in [kb/s]. @@ -261,7 +234,7 @@ int zmq::pgm_socket_t::open_transport ()          }          rc = pgm_transport_set_rxw_max_rte (transport,               options.rate * 1000 / 8); -        if (rc != pgm_ok) { +        if (rc != TRUE) {              errno = EINVAL;              return -1;          } @@ -272,7 +245,7 @@ int zmq::pgm_socket_t::open_transport ()              return -1;          }          rc = pgm_transport_set_rxw_secs (transport, options.recovery_ivl); -        if (rc != pgm_ok) { +        if (rc != TRUE) {              errno = EINVAL;              return -1;          } @@ -281,13 +254,13 @@ int zmq::pgm_socket_t::open_transport ()          //  Sender transport. -        //  Set transport->can_recv = FALSE, waiting_pipe will not be read. +        //  Waiting pipe won't be read.          rc = pgm_transport_set_send_only (transport, TRUE); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);          if (options.sndbuf) {              rc = pgm_transport_set_sndbuf (transport, (int) options.sndbuf); -            if (rc != pgm_ok) +            if (rc != TRUE)                  return -1;          } @@ -299,7 +272,7 @@ int zmq::pgm_socket_t::open_transport ()          }          rc = pgm_transport_set_txw_max_rte (transport,               options.rate * 1000 / 8); -        if (rc != pgm_ok) { +        if (rc != TRUE) {              errno = EINVAL;              return -1;          } @@ -310,14 +283,14 @@ int zmq::pgm_socket_t::open_transport ()              return -1;          }          rc = pgm_transport_set_txw_secs (transport, options.recovery_ivl); -        if (rc != pgm_ok) { +        if (rc != TRUE) {              errno = EINVAL;              return -1;          }          //  Set interval of background SPM packets [us].          rc = pgm_transport_set_ambient_spm (transport, 8192 * 1000); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);          //  Set intervals of data flushing SPM packets [us].          guint spm_heartbeat[] = {4 * 1000, 4 * 1000, 8 * 1000, 16 * 1000,  @@ -325,13 +298,13 @@ int zmq::pgm_socket_t::open_transport ()              1024 * 1000, 2048 * 1000, 4096 * 1000, 8192 * 1000};          rc = pgm_transport_set_heartbeat_spm (transport, spm_heartbeat,               G_N_ELEMENTS(spm_heartbeat)); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);      }      //  Enable multicast loopback.      if (options.use_multicast_loop) {          rc = pgm_transport_set_multicast_loop (transport, true); -        zmq_assert (rc == pgm_ok); +        zmq_assert (rc == TRUE);      }      //  Bind a transport to the specified network devices. @@ -354,28 +327,28 @@ int zmq::pgm_socket_t::open_transport ()          zmq_assert (false);      } +    //  For receiver transport preallocate pgm_msgv array. +    //  TODO: ? +    if (receiver) { +        zmq_assert (in_batch_size > 0); +        size_t max_tsdu_size = get_max_tsdu_size (); +        pgm_msgv_len = (int) in_batch_size / max_tsdu_size; +        if ((int) in_batch_size % max_tsdu_size) +            pgm_msgv_len++; +        zmq_assert (pgm_msgv_len); + +        pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len); +    } +      return 0;  }  zmq::pgm_socket_t::~pgm_socket_t ()  { -    //  Celanup. -    if (pgm_msgv) { -        delete [] pgm_msgv; -    } - -    if (transport) -        close_transport (); -} - -void zmq::pgm_socket_t::close_transport () -{    -    //  transport has to be valid. -    zmq_assert (transport); - -    pgm_transport_destroy (transport, TRUE); - -    transport = NULL; +    if (pgm_msgv) +        free (pgm_msgv); +    if (transport)  +        pgm_transport_destroy (transport, TRUE);  }  //   Get receiver fds. recv_fd is from transport->recv_sock @@ -401,7 +374,7 @@ void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,  //  Get fds and store them into user allocated memory.   //  sender_fd is from pgm_transport->send_sock.  //  receive_fd_ is from  transport->recv_sock. -//  rdata_notify_fd_ is from transport->rdata_notify (PGM2 only). +//  rdata_notify_fd_ is from transport->rdata_notify.  void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,       int *rdata_notify_fd_)  { @@ -445,52 +418,9 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)  //  Return max TSDU size without fragmentation from current PGM transport.  size_t zmq::pgm_socket_t::get_max_tsdu_size ()  { -    return (size_t)pgm_transport_max_tsdu (transport, false); -} - -//  Returns how many APDUs are needed to fill reading buffer. -size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_) -{ -    zmq_assert (readbuf_size_ > 0); - -    //  Read max TSDU size without fragmentation. -    size_t max_tsdu_size = get_max_tsdu_size (); - -    //  Calculate number of APDUs needed to fill the reading buffer. -    size_t apdu_count = (int)readbuf_size_ / max_tsdu_size; - -    if ((int) readbuf_size_ % max_tsdu_size) -        apdu_count ++; - -    //  Have to have at least one APDU. -    zmq_assert (apdu_count); - -    return apdu_count; -} - -//  Allocate buffer for one packet from the transmit window, The memory buffer  -//  is owned by the transmit window and so must be returned to the window with  -//  content via pgm_transport_send() calls or unused with pgm_packetv_free1().  -void *zmq::pgm_socket_t::get_buffer (size_t *size_) -{ -    //  Store size. -    *size_ = get_max_tsdu_size (); - -    //  Allocate buffer. -    //  TODO: use malloc instead of new -    unsigned char *apdu_buff = new unsigned char [*size_];  -    zmq_assert (apdu_buff); -    return apdu_buff; -} - -//  Return an unused packet allocated from the transmit window  -//  via pgm_packetv_alloc().  -void zmq::pgm_socket_t::free_buffer (void *data_) -{ -    delete [] (unsigned char*) data_; +    return (size_t) pgm_transport_max_tsdu (transport, false);  } -  //  pgm_transport_recvmsgv is called to fill the pgm_msgv array up to   //  pgm_msgv_len. In subsequent calls data from pgm_msgv structure are   //  returned. @@ -525,9 +455,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)          const PGMIOStatus status = pgm_recvmsgv (transport, pgm_msgv,              pgm_msgv_len, MSG_DONTWAIT, &nbytes_rec, &pgm_error); -        if (status == PGM_IO_STATUS_ERROR) { -            zmq_assert (false); -        } +        zmq_assert (status != PGM_IO_STATUS_ERROR);          //  In a case when no ODATA/RDATA fired POLLIN event (SPM...)          //  pgm_recvmsg returns ?. @@ -591,9 +519,7 @@ void zmq::pgm_socket_t::process_upstream ()      PGMIOStatus status = pgm_recvmsgv (transport, &dummy_msg,          1, MSG_DONTWAIT, &dummy_bytes, &pgm_error); -    if (status == PGM_IO_STATUS_ERROR) { -        zmq_assert (false); -    } +    zmq_assert (status != PGM_IO_STATUS_ERROR);      //  No data should be returned.      zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||  diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index b1d0718..3b2740c 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -30,7 +30,6 @@  #include <pgm/pgm.h> -#include "stdint.hpp"  #include "options.hpp"  namespace zmq @@ -62,11 +61,8 @@ namespace zmq          //  Send data as one APDU, transmit window owned memory.          size_t send (unsigned char *data_, size_t data_len_); -        //  Allocates one slice for packet in tx window. -        void *get_buffer (size_t *size_); - -        //  Fees memory allocated by get_buffer. -        void free_buffer (void *data_); +        //  Returns max tsdu size without fragmentation. +        size_t get_max_tsdu_size ();          //  Receive data from pgm socket.          ssize_t receive (void **data_, const pgm_tsi_t **tsi_); @@ -76,21 +72,9 @@ namespace zmq          void process_upstream ();      private: - -        //  Open PGM transport. -        int open_transport (); - -        //  Close transport. -        void close_transport ();          //  OpenPGM transport          pgm_transport_t* transport; -         -        //  Returns max tsdu size without fragmentation. -        size_t get_max_tsdu_size (); - -        //  Returns maximum count of apdus which fills readbuf_size_ -        size_t get_max_apdu_at_once (size_t readbuf_size_);          //  Associated socket options.          options_t options; @@ -98,19 +82,13 @@ namespace zmq          //  true when pgm_socket should create receiving side.          bool receiver; -        //  TIBCO Rendezvous format network info. -        char network [256]; - -        //  PGM transport port number. -        uint16_t port_number; - -        //  If we are using UDP encapsulation. -        bool udp_encapsulation; - -        //  Array of pgm_msgv_t structures to store received data  +        //  Array of pgm_msgv_t structures to store received data          //  from the socket (pgm_transport_recvmsgv).          pgm_msgv_t *pgm_msgv; +        //  Size of pgm_msgv array. +        size_t pgm_msgv_len; +          // How many bytes were read from pgm socket.          size_t nbytes_rec; @@ -119,9 +97,6 @@ namespace zmq          //  How many messages from pgm_msgv were already sent up.          size_t pgm_msgv_processed; - -        //  Size of pgm_msgv array. -        size_t pgm_msgv_len;      };  }  #endif  | 
