diff options
| -rw-r--r-- | src/pgm_receiver.cpp | 2 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 26 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 3 | ||||
| -rw-r--r-- | src/pgm_socket.cpp | 85 | ||||
| -rw-r--r-- | src/pgm_socket.hpp | 8 | 
5 files changed, 100 insertions, 24 deletions
| diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 50a8ff9..7af6ed5 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -153,7 +153,6 @@ void zmq::pgm_receiver_t::in_event ()          // information (sizeof uint16_t).          raw_data +=  sizeof (uint16_t);          nbytes -= sizeof (uint16_t); -        zmq_assert (apdu_offset <= nbytes);          //  New peer.          if (it == peers.end ()) { @@ -174,6 +173,7 @@ void zmq::pgm_receiver_t::in_event ()          //  Now is the possibility to join the stream.          if (!it->second.joined) { +            zmq_assert (apdu_offset <= nbytes);              zmq_assert (it->second.decoder == NULL);              //  We have to move data to the begining of the first message. diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 423865b..51dfbf1 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -68,13 +68,21 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)  {      //  Alocate 2 fds for PGM socket. -    int downlink_socket_fd; -    int uplink_socket_fd; +    int downlink_socket_fd = 0; +    int uplink_socket_fd = 0; +#ifdef ZMQ_HAVE_OPENPGM2 +    int rdata_notify_fd = 0; +#endif      encoder.set_inout (inout_);      //  Fill fds from PGM transport. +#ifdef ZMQ_HAVE_OPENPGM1      pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd); +#elif ZMQ_HAVE_OPENPGM2 +    pgm_socket.get_sender_fds  +        (&downlink_socket_fd, &uplink_socket_fd, &rdata_notify_fd); +#endif      //  Add downlink_socket_fd into poller.      handle = add_fd (downlink_socket_fd); @@ -82,9 +90,17 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)      //  Add uplink_socket_fd into the poller.      uplink_handle = add_fd (uplink_socket_fd); +    //  Add rdata_notify_fd into the poller. +#ifdef ZMQ_HAVE_OPENPGM2 +    rdata_notify_handle = add_fd (rdata_notify_fd);    +#endif +      //  Set POLLIN. We wont never want to stop polling for uplink = we never      //  want to stop porocess NAKs.      set_pollin (uplink_handle); +#ifdef ZMQ_HAVE_OPENPGM2 +    set_pollin (rdata_notify_handle); +#endif      //  Set POLLOUT for downlink_socket_handle.      set_pollout (handle); @@ -96,6 +112,9 @@ void zmq::pgm_sender_t::unplug ()  {      rm_fd (handle);      rm_fd (uplink_handle); +#ifdef ZMQ_HAVE_OPENPGM2 +    rm_fd (rdata_notify_handle); +#endif      encoder.set_inout (NULL);      inout = NULL;  } @@ -167,11 +186,12 @@ void zmq::pgm_sender_t::out_event ()              zmq_log (1, "pgm rate limit reached, %s(%i)\n", __FILE__, __LINE__);          } +#ifdef ZMQ_HAVE_OPENPGM1          //  After sending data slice is owned by tx window.          if (nbytes) {              out_buffer = NULL;          } - +#endif          write_pos += nbytes;      } diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 8fdda6c..380e80b 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -76,6 +76,9 @@ namespace zmq          //  Poll handle associated with PGM socket.          handle_t handle;          handle_t uplink_handle; +#ifdef ZMQ_HAVE_OPENPGM2 +        handle_t rdata_notify_handle; +#endif          //  Parent session.          i_inout *inout; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 65a80a5..dbfe52d 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -39,7 +39,7 @@  #include "uuid.hpp"  //#define PGM_SOCKET_DEBUG -//#define PGM_SOCKET_DEBUG_LEVEL 1 +//#define PGM_SOCKET_DEBUG_LEVEL 4  // level 1 = key behaviour  // level 2 = processing flow @@ -275,7 +275,7 @@ int zmq::pgm_socket_t::open_transport (void)          //  Set transport->can_send_data = FALSE.          //  Note that NAKs are still generated by the transport. -        rc = pgm_transport_set_recv_only (g_transport, false); +        rc = pgm_transport_set_recv_only (g_transport, true, false);          if (rc != pgm_ok) {              errno = EINVAL;              return -1; @@ -511,8 +511,18 @@ int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_,  //   Get fds and store them into user allocated memory.   //   sender_fd is from pgm_transport->send_sock.  //   receive_fd_ is from  transport->recv_sock. -int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_) +int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,  +    int *rdata_notify_fd_)  { +#ifdef ZMQ_HAVE_OPENPGM1 +    zmq_assert (send_fd_); +    zmq_assert (receive_fd_); +    zmq_assert (!rdata_notify_fd_); +#elif ZMQ_HAVE_OPENPGM2 +    zmq_assert (send_fd_); +    zmq_assert (receive_fd_); +    zmq_assert (rdata_notify_fd_); +#endif      //  Preallocate pollfds array.      int fds_array_size = pgm_sender_fd_count; @@ -530,8 +540,14 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)      zmq_assert (rc == pgm_sender_fd_count);      //  Store pfds into user allocated space. +#ifdef ZMQ_HAVE_OPENPGM1      *receive_fd_ = fds [0].fd;      *send_fd_ = fds [1].fd; +#elif ZMQ_HAVE_OPENPGM2 +    *receive_fd_ = fds [0].fd; +    *rdata_notify_fd_ = fds [1].fd; +    *send_fd_ = fds [2].fd; +#endif      delete [] fds; @@ -542,10 +558,9 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)  size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)  { -    ssize_t nbytes = 0; -  #ifdef ZMQ_HAVE_OPENPGM1 +    ssize_t nbytes = 0;      iovec iov = {data_,data_len_};      nbytes = pgm_transport_send_packetv (g_transport, &iov, 1,  @@ -561,13 +576,30 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)      //  now. We have to call write_one_pkt again.      nbytes = nbytes == -1 ? 0 : nbytes; -    zmq_log (4, "wrote %iB, %s(%i)\n", (int)nbytes, __FILE__, __LINE__); +#elif ZMQ_HAVE_OPENPGM2 + +    size_t nbytes = 0; +    +    PGMIOStatus status = pgm_send (g_transport, data_, data_len_, &nbytes); + +    if (nbytes != data_len_) { +        zmq_log (1, "status %i, data_len %i, wrote %iB, %s(%i)\n",  +            (int) status, (int) data_len_, (int) nbytes, __FILE__, __LINE__); + +        zmq_assert (status == PGM_IO_STATUS_AGAIN2); +        zmq_assert (nbytes == 0); +    } +#endif + +    zmq_log (4, "wrote %i/%iB, %s(%i)\n", (int) nbytes, (int) data_len_, +        __FILE__, __LINE__);      // We have to write all data as one packet.      if (nbytes > 0) { -        zmq_assert (nbytes == (ssize_t)data_len_); +        zmq_log (1, "data sent %i, %s(%i)\n", (int) nbytes,  +            __FILE__, __LINE__); +        zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_);      } -#endif      return nbytes;  } @@ -603,15 +635,17 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_)  //  content via pgm_transport_send() calls or unused with pgm_packetv_free1().   void *zmq::pgm_socket_t::get_buffer (size_t *size_)  { -#ifdef ZMQ_HAVE_OPENPGM1      //  Store size.      *size_ = get_max_tsdu_size (); -    //  Allocate one packet. +#ifdef ZMQ_HAVE_OPENPGM1 +    //  Allocate one packet in tx window.      return pgm_packetv_alloc (g_transport, false);  #elif ZMQ_HAVE_OPENPGM2 -    zmq_assert (false); - +    //  Allocate buffer. +    unsigned char *apdu_buff = new unsigned char [*size_];  +    zmq_assert (apdu_buff); +    return apdu_buff;  #endif  } @@ -622,7 +656,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_)  #ifdef ZMQ_HAVE_OPENPGM1      pgm_packetv_free1 (g_transport, data_, false);  #elif ZMQ_HAVE_OPENPGM2 -    zmq_assert (false); +    delete [] (unsigned char*) data_;  #endif  } @@ -718,6 +752,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)              zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",                   status, (int) nbytes_rec, __FILE__, __LINE__); +            zmq_assert (false); +              nbytes_rec = 0;              return -1;          } @@ -767,21 +803,34 @@ void zmq::pgm_socket_t::process_upstream (void)  {      zmq_log (1, "On upstream packet, %s(%i)\n", __FILE__, __LINE__); -    ssize_t dummy_bytes = 0; +    pgm_msgv_t dummy_msg;  #ifdef ZMQ_HAVE_OPENPGM1 +    ssize_t dummy_bytes = 0; +      //  We acctually do not want to read any data here we are going to       //  process NAK. -    pgm_msgv_t dummy_msg;      dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg,          1, MSG_DONTWAIT); + +    //  No data should be returned. +    zmq_assert (dummy_bytes == -1 && errno == EAGAIN); +  #elif defined ZMQ_HAVE_OPENPGM2 -    zmq_assert (false); +    size_t dummy_bytes = 0; +    GError *pgm_error = NULL; + +    PGMIOStatus status = pgm_recvmsgv (g_transport, &dummy_msg, +        1, MSG_DONTWAIT, &dummy_bytes, &pgm_error); + +    zmq_log (1, "upstream status %i, nbytes %i, %s(%i)\n", +        (int) status, (int) dummy_bytes, __FILE__, __LINE__); + +    //  No data should be returned. +    zmq_assert (dummy_bytes == 0 && status == PGM_IO_STATUS_AGAIN);  #endif -    //  No data should be returned. -    zmq_assert (dummy_bytes == -1 && errno == EAGAIN);  }  #endif diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 5225e50..473e58c 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -65,7 +65,7 @@ namespace zmq          //   Get sender and receiver fds and store it to user allocated           //   memory. Receive fd is used to process NAKs from peers. -        int get_sender_fds (int *send_fd_, int *receive_fd_); +        int get_sender_fds (int *send_fd_, int *receive_fd_, int *rdata_notify_fd_ = NULL);          //  Send data as one APDU, transmit window owned memory.          size_t send (unsigned char *data_, size_t data_len_); @@ -143,8 +143,12 @@ namespace zmq          size_t pgm_msgv_len;          //  Sender transport uses 2 fd. +#ifdef ZMQ_HAVE_OPENPGM1          enum {pgm_sender_fd_count = 2}; -     +#elif ZMQ_HAVE_OPENPGM2 +        enum {pgm_sender_fd_count = 3}; +#endif +          //  Receiver transport uses 2 fd.          enum {pgm_receiver_fd_count = 2};  #endif | 
