summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/pgm_socket.cpp83
-rw-r--r--src/pgm_socket.hpp2
2 files changed, 51 insertions, 34 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 0a4ffa3..89efe6b 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -269,9 +269,9 @@ int zmq::pgm_socket_t::open_transport (void)
// Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport.
-#ifdef ZMQ_HAVE_OPENPGM1
+#if defined ZMQ_HAVE_OPENPGM1
rc = pgm_transport_set_recv_only (g_transport, false);
-#elif ZMQ_HAVE_OPENPGM2
+#elif defined ZMQ_HAVE_OPENPGM2
rc = pgm_transport_set_recv_only (g_transport, true, false);
#endif
if (rc != pgm_ok) {
@@ -479,12 +479,13 @@ void zmq::pgm_socket_t::close_transport (void)
// Get receiver fds. recv_fd is from transport->recv_sock
// waiting_pipe_fd is from transport->waiting_pipe [0]
-int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_,
+int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,
int *waiting_pipe_fd_)
{
-#ifdef ZMQ_HAVE_WINDOWS
- zmq_assert (false);
-#else
+ zmq_assert (receive_fd_);
+ zmq_assert (waiting_pipe_fd_);
+
+#if defined ZMQ_HAVE_OPENPGM1
// For POLLIN there are 2 pollfds in pgm_transport.
int fds_array_size = pgm_receiver_fd_count;
pollfd *fds = new pollfd [fds_array_size];
@@ -500,39 +501,46 @@ int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_,
zmq_assert (rc == pgm_receiver_fd_count);
// Store pfds into user allocated space.
- *recv_fd_ = fds [0].fd;
+ *receive_fd_ = fds [0].fd;
*waiting_pipe_fd_ = fds [1].fd;
delete [] fds;
+
+#elif defined ZMQ_HAVE_OPENPGM2
+ // recv_sock2 should not be used - check it.
+ zmq_assert (g_transport->recv_sock2 == -1);
+
+ // Check if transport can receive data and can not send.
+ zmq_assert (g_transport->can_recv_data);
+ zmq_assert (!g_transport->can_send_data);
+
+ // Take FDs directly from transport.
+ *receive_fd_ = g_transport->recv_sock;
+ *waiting_pipe_fd_ = pgm_notify_get_fd (&g_transport->pending_notify);
#endif
+
return pgm_receiver_fd_count;
}
-// Get fds and store them into user allocated memory.
-// sender_fd is from pgm_transport->send_sock.
-// receive_fd_ is from transport->recv_sock.
+// Get fds and store them into user allocated memory.
+// sender_fd is from pgm_transport->send_sock.
+// receive_fd_ is from transport->recv_sock.
+// rdata_notify_fd_ is from transport->rdata_notify (PGM2 only).
int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
int *rdata_notify_fd_)
{
-#ifdef ZMQ_HAVE_OPENPGM1
zmq_assert (send_fd_);
zmq_assert (receive_fd_);
+
+#if defined ZMQ_HAVE_OPENPGM1
zmq_assert (!rdata_notify_fd_);
-#elif ZMQ_HAVE_OPENPGM2
- zmq_assert (send_fd_);
- zmq_assert (receive_fd_);
- zmq_assert (rdata_notify_fd_);
-#endif
-#ifdef ZMQ_HAVE_WINDOWS
- zmq_assert (false);
-#else
// Preallocate pollfds array.
int fds_array_size = pgm_sender_fd_count;
pollfd *fds = new pollfd [fds_array_size];
memset (fds, '\0', fds_array_size * sizeof (fds));
- // Retrieve pollfds from pgm_transport
+ // Retrieve pollfds from pgm_transport.
int rc = pgm_transport_poll_info (g_transport, fds, &fds_array_size,
POLLOUT | POLLIN);
@@ -541,18 +549,27 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
// Note that fds_array_size parameter can be
// changed inside pgm_transport_poll_info call.
zmq_assert (rc == pgm_sender_fd_count);
-
+
// Store pfds into user allocated space.
-#ifdef ZMQ_HAVE_OPENPGM1
*receive_fd_ = fds [0].fd;
*send_fd_ = fds [1].fd;
-#elif ZMQ_HAVE_OPENPGM2
- *receive_fd_ = fds [0].fd;
- *rdata_notify_fd_ = fds [1].fd;
- *send_fd_ = fds [2].fd;
-#endif
delete [] fds;
+
+#elif defined ZMQ_HAVE_OPENPGM2
+ zmq_assert (rdata_notify_fd_);
+
+ // recv_sock2 should not be used - check it.
+ zmq_assert (g_transport->recv_sock2 == -1);
+
+ // Check if transport can send data and can not receive.
+ zmq_assert (g_transport->can_send_data);
+ zmq_assert (!g_transport->can_recv_data);
+
+ // Take FDs directly from transport.
+ *receive_fd_ = g_transport->recv_sock;
+ *rdata_notify_fd_ = pgm_notify_get_fd (&g_transport->rdata_notify);
+ *send_fd_ = g_transport->send_sock;
#endif
return pgm_sender_fd_count;
@@ -562,7 +579,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{
-#ifdef ZMQ_HAVE_OPENPGM1
+#if defined ZMQ_HAVE_OPENPGM1
ssize_t nbytes = 0;
iovec iov = {data_,data_len_};
@@ -580,7 +597,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
// now. We have to call write_one_pkt again.
nbytes = nbytes == -1 ? 0 : nbytes;
-#elif ZMQ_HAVE_OPENPGM2
+#elif defined ZMQ_HAVE_OPENPGM2
size_t nbytes = 0;
@@ -642,10 +659,10 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_)
// Store size.
*size_ = get_max_tsdu_size ();
-#ifdef ZMQ_HAVE_OPENPGM1
+#if defined ZMQ_HAVE_OPENPGM1
// Allocate one packet in tx window.
return pgm_packetv_alloc (g_transport, false);
-#elif ZMQ_HAVE_OPENPGM2
+#elif defined ZMQ_HAVE_OPENPGM2
// Allocate buffer.
unsigned char *apdu_buff = new unsigned char [*size_];
zmq_assert (apdu_buff);
@@ -657,9 +674,9 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_)
// via pgm_packetv_alloc().
void zmq::pgm_socket_t::free_buffer (void *data_)
{
-#ifdef ZMQ_HAVE_OPENPGM1
+#if defined ZMQ_HAVE_OPENPGM1
pgm_packetv_free1 (g_transport, data_, false);
-#elif ZMQ_HAVE_OPENPGM2
+#elif defined ZMQ_HAVE_OPENPGM2
delete [] (unsigned char*) data_;
#endif
}
diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp
index 9b9ae45..71fa9c6 100644
--- a/src/pgm_socket.hpp
+++ b/src/pgm_socket.hpp
@@ -58,7 +58,7 @@ namespace zmq
void close_transport (void);
// Get receiver fds and store them into user allocated memory.
- int get_receiver_fds (int *recv_fd_, int *waiting_pipe_fd_);
+ int get_receiver_fds (int *receive_fd_, int *waiting_pipe_fd_);
// Get sender and receiver fds and store it to user allocated
// memory. Receive fd is used to process NAKs from peers.