diff options
| -rw-r--r-- | src/pgm_socket.cpp | 83 | ||||
| -rw-r--r-- | src/pgm_socket.hpp | 2 | 
2 files changed, 51 insertions, 34 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 0a4ffa3..89efe6b 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -269,9 +269,9 @@ int zmq::pgm_socket_t::open_transport (void)          //  Set transport->can_send_data = FALSE.          //  Note that NAKs are still generated by the transport. -#ifdef ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM1          rc = pgm_transport_set_recv_only (g_transport, false); -#elif ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2          rc = pgm_transport_set_recv_only (g_transport, true, false);  #endif          if (rc != pgm_ok) { @@ -479,12 +479,13 @@ void zmq::pgm_socket_t::close_transport (void)  //   Get receiver fds. recv_fd is from transport->recv_sock  //   waiting_pipe_fd is from transport->waiting_pipe [0] -int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_,  +int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,       int *waiting_pipe_fd_)  { -#ifdef ZMQ_HAVE_WINDOWS -    zmq_assert (false); -#else +    zmq_assert (receive_fd_); +    zmq_assert (waiting_pipe_fd_); + +#if defined ZMQ_HAVE_OPENPGM1      //  For POLLIN there are 2 pollfds in pgm_transport.      int fds_array_size = pgm_receiver_fd_count;      pollfd *fds = new pollfd [fds_array_size]; @@ -500,39 +501,46 @@ int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_,      zmq_assert (rc == pgm_receiver_fd_count);      //  Store pfds into user allocated space. -    *recv_fd_ = fds [0].fd; +    *receive_fd_ = fds [0].fd;      *waiting_pipe_fd_ = fds [1].fd;      delete [] fds; + +#elif defined ZMQ_HAVE_OPENPGM2 +    //  recv_sock2 should not be used - check it. +    zmq_assert (g_transport->recv_sock2 == -1); + +    //  Check if transport can receive data and can not send. +    zmq_assert (g_transport->can_recv_data); +    zmq_assert (!g_transport->can_send_data); + +    //  Take FDs directly from transport. +    *receive_fd_ = g_transport->recv_sock; +    *waiting_pipe_fd_ = pgm_notify_get_fd (&g_transport->pending_notify);  #endif +      return pgm_receiver_fd_count;  } -//   Get fds and store them into user allocated memory.  -//   sender_fd is from pgm_transport->send_sock. -//   receive_fd_ is from  transport->recv_sock. +//  Get fds and store them into user allocated memory.  +//  sender_fd is from pgm_transport->send_sock. +//  receive_fd_ is from  transport->recv_sock. +//  rdata_notify_fd_ is from transport->rdata_notify (PGM2 only).  int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,       int *rdata_notify_fd_)  { -#ifdef ZMQ_HAVE_OPENPGM1      zmq_assert (send_fd_);      zmq_assert (receive_fd_); + +#if defined ZMQ_HAVE_OPENPGM1      zmq_assert (!rdata_notify_fd_); -#elif ZMQ_HAVE_OPENPGM2 -    zmq_assert (send_fd_); -    zmq_assert (receive_fd_); -    zmq_assert (rdata_notify_fd_); -#endif -#ifdef ZMQ_HAVE_WINDOWS -    zmq_assert (false); -#else      //  Preallocate pollfds array.      int fds_array_size = pgm_sender_fd_count;      pollfd *fds = new pollfd [fds_array_size];      memset (fds, '\0', fds_array_size * sizeof (fds)); -    //  Retrieve pollfds from pgm_transport +    //  Retrieve pollfds from pgm_transport.      int rc = pgm_transport_poll_info (g_transport, fds, &fds_array_size,           POLLOUT | POLLIN); @@ -541,18 +549,27 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,      //  Note that fds_array_size parameter can be       //  changed inside pgm_transport_poll_info call.      zmq_assert (rc == pgm_sender_fd_count); -  +      //  Store pfds into user allocated space. -#ifdef ZMQ_HAVE_OPENPGM1      *receive_fd_ = fds [0].fd;      *send_fd_ = fds [1].fd; -#elif ZMQ_HAVE_OPENPGM2 -    *receive_fd_ = fds [0].fd; -    *rdata_notify_fd_ = fds [1].fd; -    *send_fd_ = fds [2].fd; -#endif      delete [] fds; + +#elif defined ZMQ_HAVE_OPENPGM2 +    zmq_assert (rdata_notify_fd_); + +    //  recv_sock2 should not be used - check it. +    zmq_assert (g_transport->recv_sock2 == -1); + +    //  Check if transport can send data and can not receive. +    zmq_assert (g_transport->can_send_data); +    zmq_assert (!g_transport->can_recv_data); + +    //  Take FDs directly from transport. +    *receive_fd_ = g_transport->recv_sock; +    *rdata_notify_fd_ = pgm_notify_get_fd (&g_transport->rdata_notify); +    *send_fd_ = g_transport->send_sock;  #endif      return pgm_sender_fd_count; @@ -562,7 +579,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,  size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)  { -#ifdef ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM1      ssize_t nbytes = 0;      iovec iov = {data_,data_len_}; @@ -580,7 +597,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)      //  now. We have to call write_one_pkt again.      nbytes = nbytes == -1 ? 0 : nbytes; -#elif ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2      size_t nbytes = 0; @@ -642,10 +659,10 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_)      //  Store size.      *size_ = get_max_tsdu_size (); -#ifdef ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM1      //  Allocate one packet in tx window.      return pgm_packetv_alloc (g_transport, false); -#elif ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2      //  Allocate buffer.      unsigned char *apdu_buff = new unsigned char [*size_];       zmq_assert (apdu_buff); @@ -657,9 +674,9 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_)  //  via pgm_packetv_alloc().   void zmq::pgm_socket_t::free_buffer (void *data_)  { -#ifdef ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM1      pgm_packetv_free1 (g_transport, data_, false); -#elif ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2      delete [] (unsigned char*) data_;  #endif  } diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 9b9ae45..71fa9c6 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -58,7 +58,7 @@ namespace zmq          void close_transport (void);          //   Get receiver fds and store them into user allocated memory. -        int get_receiver_fds (int *recv_fd_, int *waiting_pipe_fd_); +        int get_receiver_fds (int *receive_fd_, int *waiting_pipe_fd_);          //   Get sender and receiver fds and store it to user allocated           //   memory. Receive fd is used to process NAKs from peers.  | 
