From 11fec367d1a13c3f0248f8542eb805b8d9571685 Mon Sep 17 00:00:00 2001 From: malosek Date: Tue, 5 Jan 2010 11:22:14 +0100 Subject: added pending event fd handling by the pgm_sender --- src/pgm_sender.cpp | 9 ++++++++- src/pgm_sender.hpp | 1 + src/pgm_socket.cpp | 10 +++++++--- src/pgm_socket.hpp | 2 +- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 010d99f..3f08d8e 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -54,6 +54,8 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) out_buffer_size = pgm_socket.get_max_tsdu_size (); out_buffer = (unsigned char*) malloc (out_buffer_size); zmq_assert (out_buffer); + + return rc; } void zmq::pgm_sender_t::plug (i_inout *inout_) @@ -62,20 +64,24 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) int downlink_socket_fd = 0; int uplink_socket_fd = 0; int rdata_notify_fd = 0; + int pending_notify_fd = 0; encoder.set_inout (inout_); // Fill fds from PGM transport and add them to the poller. pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, - &rdata_notify_fd); + &rdata_notify_fd, &pending_notify_fd); + handle = add_fd (downlink_socket_fd); uplink_handle = add_fd (uplink_socket_fd); rdata_notify_handle = add_fd (rdata_notify_fd); + pending_notify_handle = add_fd (pending_notify_fd); // Set POLLIN. We wont never want to stop polling for uplink = we never // want to stop porocess NAKs. set_pollin (uplink_handle); set_pollin (rdata_notify_handle); + set_pollin (pending_notify_handle); // Set POLLOUT for downlink_socket_handle. set_pollout (handle); @@ -86,6 +92,7 @@ void zmq::pgm_sender_t::unplug () rm_fd (handle); rm_fd (uplink_handle); rm_fd (rdata_notify_handle); + rm_fd (pending_notify_handle); encoder.set_inout (NULL); } diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 8eb9d88..43adb8b 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -72,6 +72,7 @@ namespace zmq handle_t handle; handle_t uplink_handle; handle_t rdata_notify_handle; + handle_t pending_notify_handle; // Output buffer from pgm_socket. unsigned char *out_buffer; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 0fbe58a..11084ff 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -375,13 +375,15 @@ void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, // sender_fd is from pgm_transport->send_sock. // receive_fd_ is from transport->recv_sock. // rdata_notify_fd_ is from transport->rdata_notify. +// pending_notify_fd_ is from transport->pending_notify. void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, - int *rdata_notify_fd_) + int *rdata_notify_fd_, int *pending_notify_fd_) { zmq_assert (send_fd_); zmq_assert (receive_fd_); zmq_assert (rdata_notify_fd_); + zmq_assert (pending_notify_fd_); // recv_sock2 should not be used - check it. zmq_assert (transport->recv_sock2 == -1); @@ -390,10 +392,12 @@ void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, zmq_assert (transport->can_send_data); zmq_assert (!transport->can_recv_data); - // Take FDs directly from transport. + // Take FDs from transport. + *send_fd_ = pgm_transport_get_send_fd (transport); *receive_fd_ = pgm_transport_get_recv_fd (transport); + *rdata_notify_fd_ = pgm_transport_get_repair_fd (transport); - *send_fd_ = pgm_transport_get_send_fd (transport); + *pending_notify_fd_ = pgm_transport_get_pending_fd (transport); } // Send one APDU, transmit window owned memory. diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 502496a..b9f55d1 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -56,7 +56,7 @@ namespace zmq // Get sender and receiver fds and store it to user allocated // memory. Receive fd is used to process NAKs from peers. void get_sender_fds (int *send_fd_, int *receive_fd_, - int *rdata_notify_fd_); + int *rdata_notify_fd_, int *pending_notify_fd_); // Send data as one APDU, transmit window owned memory. size_t send (unsigned char *data_, size_t data_len_); -- cgit v1.2.3