diff options
Diffstat (limited to 'src/pgm_sender.cpp')
-rw-r--r-- | src/pgm_sender.cpp | 94 |
1 files changed, 35 insertions, 59 deletions
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 880bb09..0d87ee0 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -25,12 +25,13 @@ #include "windows.hpp" #endif -#include <iostream> +#include <stdlib.h> #include "io_thread.hpp" #include "pgm_sender.hpp" #include "err.hpp" #include "wire.hpp" +#include "stdint.hpp" zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, const options_t &options_) : @@ -38,18 +39,21 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, encoder (0, false), pgm_socket (false, options_), options (options_), - inout (NULL), out_buffer (NULL), out_buffer_size (0), - write_size (0), - write_pos (0), - first_message_offset (-1) + write_size (0) { } int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) { - return pgm_socket.init (udp_encapsulation_, network_); + int rc = pgm_socket.init (udp_encapsulation_, network_); + if (rc != 0) + return rc; + + out_buffer_size = pgm_socket.get_max_tsdu_size (); + out_buffer = (unsigned char*) malloc (out_buffer_size); + zmq_assert (out_buffer); } void zmq::pgm_sender_t::plug (i_inout *inout_) @@ -61,17 +65,11 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) encoder.set_inout (inout_); - // Fill fds from PGM transport. - pgm_socket.get_sender_fds - (&downlink_socket_fd, &uplink_socket_fd, &rdata_notify_fd); - - // Add downlink_socket_fd into poller. + // Fill fds from PGM transport and add them to the poller. + pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, + &rdata_notify_fd); handle = add_fd (downlink_socket_fd); - - // Add uplink_socket_fd into the poller. uplink_handle = add_fd (uplink_socket_fd); - - // Add rdata_notify_fd into the poller. rdata_notify_handle = add_fd (rdata_notify_fd); // Set POLLIN. We wont never want to stop polling for uplink = we never @@ -81,8 +79,6 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) // Set POLLOUT for downlink_socket_handle. set_pollout (handle); - - inout = inout_; } void zmq::pgm_sender_t::unplug () @@ -91,7 +87,6 @@ void zmq::pgm_sender_t::unplug () rm_fd (uplink_handle); rm_fd (rdata_notify_handle); encoder.set_inout (NULL); - inout = NULL; } void zmq::pgm_sender_t::revive () @@ -103,14 +98,14 @@ void zmq::pgm_sender_t::revive () zmq::pgm_sender_t::~pgm_sender_t () { if (out_buffer) { - pgm_socket.free_buffer (out_buffer); + free (out_buffer); out_buffer = NULL; } } -// In event on sender side means NAK or SPMR receiving from some peer. void zmq::pgm_sender_t::in_event () { + // In event on sender side means NAK or SPMR receiving from some peer. pgm_socket.process_upstream (); } @@ -118,55 +113,36 @@ void zmq::pgm_sender_t::out_event () { // POLLOUT event from send socket. If write buffer is empty, // try to read new data from the encoder. - if (write_pos == write_size) { - - // Get buffer if we do not have already one. - if (!out_buffer) { - out_buffer = (unsigned char*) - pgm_socket.get_buffer (&out_buffer_size); - } + if (write_size == 0) { - assert (out_buffer_size > 0); - - // First two bytes /sizeof (uint16_t)/ are used to store message - // offset in following steps. + // First two bytes (sizeof uint16_t) are used to store message + // offset in following steps. Note that by passing our buffer to + // the get data function we prevent it from returning its own buffer. unsigned char *bf = out_buffer + sizeof (uint16_t); - write_size = out_buffer_size - sizeof (uint16_t); - encoder.get_data (&bf, &write_size, &first_message_offset); - write_pos = 0; + size_t bfsz = out_buffer_size - sizeof (uint16_t); + int offset = -1; + encoder.get_data (&bf, &bfsz, &offset); // If there are no data to write stop polling for output. - if (!write_size) { + if (!bfsz) { reset_pollout (handle); - } else { - // Addning uint16_t for offset in a case when encoder returned > 0B. - write_size += sizeof (uint16_t); + return; } - } - - // If there are any data to write, write them into the socket. - // Note that all data has to written in one write_one_pkt_with_offset call. - if (write_pos < write_size) { - size_t nbytes = write_one_pkt_with_offset (out_buffer + write_pos, - write_size - write_pos, (uint16_t) first_message_offset); - - // We can write either all data or 0 which means rate limit reached. - zmq_assert (write_size - write_pos == nbytes || nbytes == 0); - write_pos += nbytes; + // Put offset information in the buffer. + write_size = bfsz + sizeof (uint16_t); + put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); } -} -size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_, - size_t size_, uint16_t offset_) -{ - // Put offset information in the buffer. - put_uint16 (data_, offset_); - - // Send data. - size_t nbytes = pgm_socket.send (data_, size_); + // Send the data. + size_t nbytes = pgm_socket.send (out_buffer, write_size); - return nbytes; + // We can write either all data or 0 which means rate limit reached. + if (nbytes == write_size) + write_size = 0; + else + zmq_assert (nbytes == 0); } + #endif |