From 08b02a43ae21e833a04d274fb20ef3bbc73c0d09 Mon Sep 17 00:00:00 2001 From: malosek Date: Tue, 3 Nov 2009 14:11:53 +0100 Subject: fixed get_sender_fds and get_receiver_fds for openpgm2 --- src/pgm_socket.cpp | 83 ++++++++++++++++++++++++++++++++---------------------- src/pgm_socket.hpp | 2 +- 2 files changed, 51 insertions(+), 34 deletions(-) diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 0a4ffa3..89efe6b 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -269,9 +269,9 @@ int zmq::pgm_socket_t::open_transport (void) // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. -#ifdef ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM1 rc = pgm_transport_set_recv_only (g_transport, false); -#elif ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2 rc = pgm_transport_set_recv_only (g_transport, true, false); #endif if (rc != pgm_ok) { @@ -479,12 +479,13 @@ void zmq::pgm_socket_t::close_transport (void) // Get receiver fds. recv_fd is from transport->recv_sock // waiting_pipe_fd is from transport->waiting_pipe [0] -int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_, +int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, int *waiting_pipe_fd_) { -#ifdef ZMQ_HAVE_WINDOWS - zmq_assert (false); -#else + zmq_assert (receive_fd_); + zmq_assert (waiting_pipe_fd_); + +#if defined ZMQ_HAVE_OPENPGM1 // For POLLIN there are 2 pollfds in pgm_transport. int fds_array_size = pgm_receiver_fd_count; pollfd *fds = new pollfd [fds_array_size]; @@ -500,39 +501,46 @@ int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_, zmq_assert (rc == pgm_receiver_fd_count); // Store pfds into user allocated space. - *recv_fd_ = fds [0].fd; + *receive_fd_ = fds [0].fd; *waiting_pipe_fd_ = fds [1].fd; delete [] fds; + +#elif defined ZMQ_HAVE_OPENPGM2 + // recv_sock2 should not be used - check it. + zmq_assert (g_transport->recv_sock2 == -1); + + // Check if transport can receive data and can not send. + zmq_assert (g_transport->can_recv_data); + zmq_assert (!g_transport->can_send_data); + + // Take FDs directly from transport. + *receive_fd_ = g_transport->recv_sock; + *waiting_pipe_fd_ = pgm_notify_get_fd (&g_transport->pending_notify); #endif + return pgm_receiver_fd_count; } -// Get fds and store them into user allocated memory. -// sender_fd is from pgm_transport->send_sock. -// receive_fd_ is from transport->recv_sock. +// Get fds and store them into user allocated memory. +// sender_fd is from pgm_transport->send_sock. +// receive_fd_ is from transport->recv_sock. +// rdata_notify_fd_ is from transport->rdata_notify (PGM2 only). int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, int *rdata_notify_fd_) { -#ifdef ZMQ_HAVE_OPENPGM1 zmq_assert (send_fd_); zmq_assert (receive_fd_); + +#if defined ZMQ_HAVE_OPENPGM1 zmq_assert (!rdata_notify_fd_); -#elif ZMQ_HAVE_OPENPGM2 - zmq_assert (send_fd_); - zmq_assert (receive_fd_); - zmq_assert (rdata_notify_fd_); -#endif -#ifdef ZMQ_HAVE_WINDOWS - zmq_assert (false); -#else // Preallocate pollfds array. int fds_array_size = pgm_sender_fd_count; pollfd *fds = new pollfd [fds_array_size]; memset (fds, '\0', fds_array_size * sizeof (fds)); - // Retrieve pollfds from pgm_transport + // Retrieve pollfds from pgm_transport. int rc = pgm_transport_poll_info (g_transport, fds, &fds_array_size, POLLOUT | POLLIN); @@ -541,18 +549,27 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, // Note that fds_array_size parameter can be // changed inside pgm_transport_poll_info call. zmq_assert (rc == pgm_sender_fd_count); - + // Store pfds into user allocated space. -#ifdef ZMQ_HAVE_OPENPGM1 *receive_fd_ = fds [0].fd; *send_fd_ = fds [1].fd; -#elif ZMQ_HAVE_OPENPGM2 - *receive_fd_ = fds [0].fd; - *rdata_notify_fd_ = fds [1].fd; - *send_fd_ = fds [2].fd; -#endif delete [] fds; + +#elif defined ZMQ_HAVE_OPENPGM2 + zmq_assert (rdata_notify_fd_); + + // recv_sock2 should not be used - check it. + zmq_assert (g_transport->recv_sock2 == -1); + + // Check if transport can send data and can not receive. + zmq_assert (g_transport->can_send_data); + zmq_assert (!g_transport->can_recv_data); + + // Take FDs directly from transport. + *receive_fd_ = g_transport->recv_sock; + *rdata_notify_fd_ = pgm_notify_get_fd (&g_transport->rdata_notify); + *send_fd_ = g_transport->send_sock; #endif return pgm_sender_fd_count; @@ -562,7 +579,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) { -#ifdef ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM1 ssize_t nbytes = 0; iovec iov = {data_,data_len_}; @@ -580,7 +597,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) // now. We have to call write_one_pkt again. nbytes = nbytes == -1 ? 0 : nbytes; -#elif ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2 size_t nbytes = 0; @@ -642,10 +659,10 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_) // Store size. *size_ = get_max_tsdu_size (); -#ifdef ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM1 // Allocate one packet in tx window. return pgm_packetv_alloc (g_transport, false); -#elif ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2 // Allocate buffer. unsigned char *apdu_buff = new unsigned char [*size_]; zmq_assert (apdu_buff); @@ -657,9 +674,9 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_) // via pgm_packetv_alloc(). void zmq::pgm_socket_t::free_buffer (void *data_) { -#ifdef ZMQ_HAVE_OPENPGM1 +#if defined ZMQ_HAVE_OPENPGM1 pgm_packetv_free1 (g_transport, data_, false); -#elif ZMQ_HAVE_OPENPGM2 +#elif defined ZMQ_HAVE_OPENPGM2 delete [] (unsigned char*) data_; #endif } diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 9b9ae45..71fa9c6 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -58,7 +58,7 @@ namespace zmq void close_transport (void); // Get receiver fds and store them into user allocated memory. - int get_receiver_fds (int *recv_fd_, int *waiting_pipe_fd_); + int get_receiver_fds (int *receive_fd_, int *waiting_pipe_fd_); // Get sender and receiver fds and store it to user allocated // memory. Receive fd is used to process NAKs from peers. -- cgit v1.2.3