diff options
Diffstat (limited to 'src/pgm_sender.cpp')
-rw-r--r-- | src/pgm_sender.cpp | 72 |
1 files changed, 63 insertions, 9 deletions
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 9aeb7a9..3fb24c1 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -4,16 +4,16 @@ This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ @@ -36,6 +36,8 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, const options_t &options_) : io_object_t (parent_), + has_tx_timer (false), + has_rx_timer (false), encoder (0), pgm_socket (false, options_), options (options_), @@ -58,7 +60,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) return rc; } -void zmq::pgm_sender_t::plug (i_inout *inout_) +void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_) { // Alocate 2 fds for PGM socket. int downlink_socket_fd = 0; @@ -89,6 +91,16 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) void zmq::pgm_sender_t::unplug () { + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + + if (has_tx_timer) { + cancel_timer (tx_timer_id); + has_tx_timer = false; + } + rm_fd (handle); rm_fd (uplink_handle); rm_fd (rdata_notify_handle); @@ -96,13 +108,19 @@ void zmq::pgm_sender_t::unplug () encoder.set_inout (NULL); } -void zmq::pgm_sender_t::revive () +void zmq::pgm_sender_t::terminate () +{ + unplug (); + delete this; +} + +void zmq::pgm_sender_t::activate_out () { set_pollout (handle); out_event (); } -void zmq::pgm_sender_t::resume_input () +void zmq::pgm_sender_t::activate_in () { zmq_assert (false); } @@ -117,8 +135,18 @@ zmq::pgm_sender_t::~pgm_sender_t () void zmq::pgm_sender_t::in_event () { - // In event on sender side means NAK or SPMR receiving from some peer. + if (has_rx_timer) { + cancel_timer (rx_timer_id); + has_rx_timer = false; + } + + // In-event on sender side means NAK or SPMR receiving from some peer. pgm_socket.process_upstream (); + if (errno == ENOMEM || errno == EBUSY) { + const long timeout = pgm_socket.get_rx_timeout (); + add_timer (timeout, rx_timer_id); + has_rx_timer = true; + } } void zmq::pgm_sender_t::out_event () @@ -146,14 +174,40 @@ void zmq::pgm_sender_t::out_event () put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); } + if (has_tx_timer) { + cancel_timer (tx_timer_id); + has_tx_timer = false; + } + // Send the data. size_t nbytes = pgm_socket.send (out_buffer, write_size); // We can write either all data or 0 which means rate limit reached. - if (nbytes == write_size) + if (nbytes == write_size) { write_size = 0; - else + } else { zmq_assert (nbytes == 0); + + if (errno == ENOMEM) { + const long timeout = pgm_socket.get_tx_timeout (); + add_timer (timeout, tx_timer_id); + has_tx_timer = true; + } else + zmq_assert (errno == EBUSY); + } +} + +void zmq::pgm_sender_t::timer_event (int token) +{ + // Timer cancels on return by poller_base. + if (token == rx_timer_id) { + has_rx_timer = false; + in_event (); + } else if (token == tx_timer_id) { + has_tx_timer = false; + out_event (); + } else + zmq_assert (false); } #endif |