diff options
| -rw-r--r-- | src/pgm_receiver.cpp | 21 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 10 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 33 | ||||
| -rw-r--r-- | src/pgm_socket.cpp | 92 | ||||
| -rw-r--r-- | src/pgm_socket.hpp | 24 | 
5 files changed, 24 insertions, 156 deletions
| diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 5de98b7..aaccd0a 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -34,20 +34,6 @@  #include "wire.hpp"  #include "i_inout.hpp" -//#define PGM_RECEIVER_DEBUG -//#define PGM_RECEIVER_DEBUG_LEVEL 1 - -// level 1 = key behaviour -// level 2 = processing flow -// level 4 = infos - -#ifndef PGM_RECEIVER_DEBUG -#   define zmq_log(n, ...)  while (0) -#else -#   define zmq_log(n, ...)    do { if ((n) <= PGM_RECEIVER_DEBUG_LEVEL) \ -        { printf (__VA_ARGS__);}} while (0) -#endif -  zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,         const options_t &options_, const char *session_name_) :      io_object_t (parent_), @@ -161,12 +147,8 @@ void zmq::pgm_receiver_t::in_event ()          //  New peer.          if (it == peers.end ()) { -              peer_info_t peer_info = {false, NULL};              it = peers.insert (std::make_pair (*tsi, peer_info)).first; - -            zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_tsi_print (tsi), -                __FILE__, __LINE__);          }          //  There is not beginning of the message in current APDU and we @@ -191,9 +173,6 @@ void zmq::pgm_receiver_t::in_event ()              //  Create and connect decoder for joined peer.              it->second.decoder = new zmq_decoder_t (0);              it->second.decoder->set_inout (inout); - -            zmq_log (1, "Peer %s joined into the stream, %s(%i)\n",  -                pgm_tsi_print (tsi), __FILE__, __LINE__);          }          if (nbytes > 0) { diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index f92f1c5..fa84acb 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -37,7 +37,6 @@  #include "zmq_decoder.hpp"  #include "pgm_socket.hpp" -  namespace zmq  { @@ -66,13 +65,16 @@ namespace zmq      private:          //  Map to hold TSI, joined and decoder for each peer. -        struct peer_info_t { +        struct peer_info_t +        {              bool joined;              zmq_decoder_t *decoder;          }; -        struct tsi_comp { -            bool operator () (const pgm_tsi_t <si, const pgm_tsi_t &rtsi) const +        struct tsi_comp +        { +            inline bool operator () (const pgm_tsi_t <si, +                const pgm_tsi_t &rtsi) const              {                  if (ltsi.sport < rtsi.sport)                      return true; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 7686286..19fc0e2 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -32,20 +32,6 @@  #include "err.hpp"  #include "wire.hpp" -//#define PGM_SENDER_DEBUG -//#define PGM_SENDER_DEBUG_LEVEL 1 - -// level 1 = key behaviour -// level 2 = processing flow -// level 4 = infos - -#ifndef PGM_SENDER_DEBUG -#   define zmq_log(n, ...)  while (0) -#else -#   define zmq_log(n, ...)    do { if ((n) <= PGM_SENDER_DEBUG_LEVEL) \ -        { printf (__VA_ARGS__);}} while (0) -#endif -  zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,         const options_t &options_, const char *session_name_) :      io_object_t (parent_), @@ -119,9 +105,6 @@ void zmq::pgm_sender_t::revive ()  zmq::pgm_sender_t::~pgm_sender_t ()  { -    zmq_log (4, "pgm_sender_t destructor, %s(%i)\n", -        __FILE__, __LINE__); -      if (out_buffer) {          pgm_socket.free_buffer (out_buffer);          out_buffer = NULL; @@ -171,17 +154,8 @@ void zmq::pgm_sender_t::out_event ()          size_t nbytes = write_one_pkt_with_offset (out_buffer + write_pos,               write_size - write_pos, (uint16_t) first_message_offset); -        //  We can write all data or 0 which means rate limit reached. -        if (write_size - write_pos != nbytes && nbytes != 0) { -            zmq_log (2, "write_size - write_pos %i, nbytes %i, %s(%i)", -                (int)(write_size - write_pos), (int)nbytes, __FILE__, __LINE__); -            assert (false); -        } - -        //  PGM rate limit reached nbytes is 0. -        if (!nbytes) { -            zmq_log (1, "pgm rate limit reached, %s(%i)\n", __FILE__, __LINE__); -        } +        //  We can write either all data or 0 which means rate limit reached. +        zmq_assert (write_size - write_pos == nbytes || nbytes == 0);          write_pos += nbytes;      } @@ -191,9 +165,6 @@ void zmq::pgm_sender_t::out_event ()  size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_,       size_t size_, uint16_t offset_)  { -    zmq_log (4, "data_size %i, first message offset %i, %s(%i)\n", -        (int) size_, offset_, __FILE__, __LINE__); -      //  Put offset information in the buffer.      put_uint16 (data_, offset_); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 60712f4..3b67b76 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -41,20 +41,6 @@  #include "err.hpp"  #include "uuid.hpp" -//#define PGM_SOCKET_DEBUG -//#define PGM_SOCKET_DEBUG_LEVEL 4 - -// level 1 = key behaviour -// level 2 = processing flow -// level 4 = infos - -#ifndef PGM_SOCKET_DEBUG -#   define zmq_log(n, ...)  while (0) -#else -#   define zmq_log(n, ...)    do { if ((n) <= PGM_SOCKET_DEBUG_LEVEL) \ -        { printf (__VA_ARGS__);}} while (0) -#endif -  zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :      transport (NULL),      options (options_), @@ -91,10 +77,6 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)      memset (network, '\0', sizeof (network));      memcpy (network, network_, port_delim - network_); -    zmq_log (1, "parsed: network  %s, port %i, udp encaps. %s, %s(%i)\n",  -        network, port_number, udp_encapsulation ? "yes" : "no", -        __FILE__, __LINE__); -      //  Open PGM transport.      int rc = open_transport ();      if (rc != 0) @@ -105,20 +87,13 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)      if (receiver) {          pgm_msgv_len = get_max_apdu_at_once (in_batch_size);          pgm_msgv = new pgm_msgv_t [pgm_msgv_len]; -        zmq_log (1, "PGM transport: pgm_msgv_len %i, %s(%i)\n",  -            (int) pgm_msgv_len, __FILE__, __LINE__);      }      return 0;  } -int zmq::pgm_socket_t::open_transport (void) +int zmq::pgm_socket_t::open_transport ()  { - -    zmq_log (1, "Opening PGM: network  %s, port %i, udp encaps. %s, %s(%i)\n", -        network, port_number, udp_encapsulation ? "yes" : "no",  -        __FILE__, __LINE__); -      //  Can not open transport before destroying old one.       zmq_assert (transport == NULL); @@ -193,8 +168,6 @@ int zmq::pgm_socket_t::open_transport (void)      pgm_if_free_transport_info (res); -    zmq_log (1, "PGM transport created, %s(%i)\n", __FILE__, __LINE__); -      //  Common parameters for receiver and sender.      //  Set maximum transport protocol data unit size (TPDU). @@ -211,13 +184,11 @@ int zmq::pgm_socket_t::open_transport (void)          return -1;      } -#ifdef ZMQ_HAVE_OPENPGM2      //  Set nonblocking send/recv sockets.      if (!pgm_transport_set_nonblocking (transport, true)) {          errno = EINVAL;          return -1;      } -#endif      if (receiver) { @@ -338,8 +309,6 @@ int zmq::pgm_socket_t::open_transport (void)          return -1;      } -    zmq_log (1, "PGM transport bound, %s(%i)\n", __FILE__, __LINE__); -      return 0;  } @@ -354,7 +323,7 @@ zmq::pgm_socket_t::~pgm_socket_t ()          close_transport ();  } -void zmq::pgm_socket_t::close_transport (void) +void zmq::pgm_socket_t::close_transport ()  {         //  transport has to be valid.      zmq_assert (transport); @@ -366,7 +335,7 @@ void zmq::pgm_socket_t::close_transport (void)  //   Get receiver fds. recv_fd is from transport->recv_sock  //   waiting_pipe_fd is from transport->waiting_pipe [0] -int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,  +void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,       int *waiting_pipe_fd_)  {      zmq_assert (receive_fd_); @@ -382,15 +351,13 @@ int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,      //  Take FDs directly from transport.      *receive_fd_ = pgm_transport_get_recv_fd (transport);      *waiting_pipe_fd_ = pgm_transport_get_pending_fd (transport); - -    return pgm_receiver_fd_count;  }  //  Get fds and store them into user allocated memory.   //  sender_fd is from pgm_transport->send_sock.  //  receive_fd_ is from  transport->recv_sock.  //  rdata_notify_fd_ is from transport->rdata_notify (PGM2 only). -int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,  +void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,       int *rdata_notify_fd_)  {      zmq_assert (send_fd_); @@ -409,8 +376,6 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,      *receive_fd_ = pgm_transport_get_recv_fd (transport);      *rdata_notify_fd_ = pgm_transport_get_repair_fd (transport);      *send_fd_ = pgm_transport_get_send_fd (transport); - -    return pgm_sender_fd_count;  }  //  Send one APDU, transmit window owned memory. @@ -421,28 +386,19 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)      PGMIOStatus status = pgm_send (transport, data_, data_len_, &nbytes);      if (nbytes != data_len_) { -        zmq_log (1, "status %i, data_len %i, wrote %iB, %s(%i)\n",  -            (int) status, (int) data_len_, (int) nbytes, __FILE__, __LINE__); -          zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED);          zmq_assert (nbytes == 0);      } - -    zmq_log (4, "wrote %i/%iB, %s(%i)\n", (int) nbytes, (int) data_len_, -        __FILE__, __LINE__);      // We have to write all data as one packet. -    if (nbytes > 0) { -        zmq_log (1, "data sent %iB, %s(%i)\n", (int) nbytes,  -            __FILE__, __LINE__); +    if (nbytes > 0)          zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_); -    }      return nbytes;  }  //  Return max TSDU size without fragmentation from current PGM transport. -size_t zmq::pgm_socket_t::get_max_tsdu_size (void) +size_t zmq::pgm_socket_t::get_max_tsdu_size ()  {      return (size_t)pgm_transport_max_tsdu (transport, false);  } @@ -494,7 +450,6 @@ void zmq::pgm_socket_t::free_buffer (void *data_)  //  returned.  ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)  { -      size_t raw_data_len = 0;      //  We just sent all data from pgm_transport_recvmsgv up  @@ -505,7 +460,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)          nbytes_rec = 0;          nbytes_processed = 0;          pgm_msgv_processed = 0; -          return 0;      } @@ -525,11 +479,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)          const PGMIOStatus status = pgm_recvmsgv (transport, pgm_msgv,              pgm_msgv_len, MSG_DONTWAIT, &nbytes_rec, &pgm_error); -        if (nbytes_rec > 0) { -            zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",  -                status, (int) nbytes_rec, __FILE__, __LINE__); -        } -          //  In a case when no ODATA/RDATA fired POLLIN event (SPM...)          //  pgm_recvmsg returns ?.          if (status == PGM_IO_STATUS_TIMER_PENDING) { @@ -545,17 +494,10 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)          //  Data loss.          if (status == PGM_IO_STATUS_RESET) { -            zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",  -                status, (int) nbytes_rec, __FILE__, __LINE__); -              pgm_peer_t* peer = (pgm_peer_t*) transport->peers_pending->data;              //  Save lost data TSI.              *tsi_ = &peer->tsi; - -            zmq_log (1, "Data loss detected %s, %s(%i)\n", pgm_tsi_print (&peer->tsi), -                __FILE__, __LINE__); -              nbytes_rec = 0;              //  In case of dala loss -1 is returned. @@ -563,18 +505,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)          }          //  Catch the rest of the errors.  -        if (status != PGM_IO_STATUS_NORMAL) { -            zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",  -                status, (int) nbytes_rec, __FILE__, __LINE__); - -            zmq_assert (false); - -            nbytes_rec = 0; -            return -1; -        } - -        zmq_log (4, "received %i bytes\n", (int)nbytes_rec); - +        zmq_assert (status == PGM_IO_STATUS_NORMAL);      }      zmq_assert (nbytes_rec > 0); @@ -596,15 +527,11 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)      pgm_msgv_processed++;      nbytes_processed +=raw_data_len; -    zmq_log (4, "sendig up %i bytes\n", (int)raw_data_len); -      return raw_data_len;  } -void zmq::pgm_socket_t::process_upstream (void) +void zmq::pgm_socket_t::process_upstream ()  { -    zmq_log (1, "On upstream packet, %s(%i)\n", __FILE__, __LINE__); -      pgm_msgv_t dummy_msg;      size_t dummy_bytes = 0; @@ -613,9 +540,6 @@ void zmq::pgm_socket_t::process_upstream (void)      PGMIOStatus status = pgm_recvmsgv (transport, &dummy_msg,          1, MSG_DONTWAIT, &dummy_bytes, &pgm_error); -    zmq_log (1, "upstream status %i, nbytes %i, %s(%i)\n", -        (int) status, (int) dummy_bytes, __FILE__, __LINE__); -      //  No data should be returned.      zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||           status == PGM_IO_STATUS_RATE_LIMITED)); diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index d89d19c..ea37004 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -52,18 +52,18 @@ namespace zmq          int init (bool udp_encapsulation_, const char *network_);          //  Open PGM transport. Parameters are the same as in constructor. -        int open_transport (void); +        int open_transport ();          //  Close transport. -        void close_transport (void); +        void close_transport ();          //   Get receiver fds and store them into user allocated memory. -        int get_receiver_fds (int *receive_fd_, int *waiting_pipe_fd_); +        void get_receiver_fds (int *receive_fd_, int *waiting_pipe_fd_);          //   Get sender and receiver fds and store it to user allocated           //   memory. Receive fd is used to process NAKs from peers. -        int get_sender_fds (int *send_fd_, int *receive_fd_, -            int *rdata_notify_fd_ = NULL); +        void get_sender_fds (int *send_fd_, int *receive_fd_, +            int *rdata_notify_fd_);          //  Send data as one APDU, transmit window owned memory.          size_t send (unsigned char *data_, size_t data_len_); @@ -79,17 +79,15 @@ namespace zmq          //  POLLIN on sender side should mean NAK or SPMR receiving.           //  process_upstream function is used to handle such a situation. -        void process_upstream (void); +        void process_upstream (); -    protected: +    private:          //  OpenPGM transport          pgm_transport_t* transport; - -    private:          //  Returns max tsdu size without fragmentation. -        size_t get_max_tsdu_size (void); +        size_t get_max_tsdu_size ();          //  Returns maximum count of apdus which fills readbuf_size_          size_t get_max_apdu_at_once (size_t readbuf_size_); @@ -124,12 +122,6 @@ namespace zmq          //  Size of pgm_msgv array.          size_t pgm_msgv_len; - -        //  Sender transport uses 2 fd. -        enum {pgm_sender_fd_count = 3}; - -        //  Receiver transport uses 2 fd. -        enum {pgm_receiver_fd_count = 2};      };  }  #endif | 
