summaryrefslogtreecommitdiff
path: root/src/pgm_sender.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-28 11:51:06 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-28 11:51:06 +0100
commit5852db451a76905336601c5ba3e4f33006f007fb (patch)
treeb881da58001ebd13519b62237acbb84a4a6808e7 /src/pgm_sender.cpp
parentaebff623f36efddc0de7a3192832b61802f8cec8 (diff)
PGM code cleanup
Diffstat (limited to 'src/pgm_sender.cpp')
-rw-r--r--src/pgm_sender.cpp94
1 files changed, 35 insertions, 59 deletions
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 880bb09..0d87ee0 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -25,12 +25,13 @@
#include "windows.hpp"
#endif
-#include <iostream>
+#include <stdlib.h>
#include "io_thread.hpp"
#include "pgm_sender.hpp"
#include "err.hpp"
#include "wire.hpp"
+#include "stdint.hpp"
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_) :
@@ -38,18 +39,21 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
encoder (0, false),
pgm_socket (false, options_),
options (options_),
- inout (NULL),
out_buffer (NULL),
out_buffer_size (0),
- write_size (0),
- write_pos (0),
- first_message_offset (-1)
+ write_size (0)
{
}
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
{
- return pgm_socket.init (udp_encapsulation_, network_);
+ int rc = pgm_socket.init (udp_encapsulation_, network_);
+ if (rc != 0)
+ return rc;
+
+ out_buffer_size = pgm_socket.get_max_tsdu_size ();
+ out_buffer = (unsigned char*) malloc (out_buffer_size);
+ zmq_assert (out_buffer);
}
void zmq::pgm_sender_t::plug (i_inout *inout_)
@@ -61,17 +65,11 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
encoder.set_inout (inout_);
- // Fill fds from PGM transport.
- pgm_socket.get_sender_fds
- (&downlink_socket_fd, &uplink_socket_fd, &rdata_notify_fd);
-
- // Add downlink_socket_fd into poller.
+ // Fill fds from PGM transport and add them to the poller.
+ pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
+ &rdata_notify_fd);
handle = add_fd (downlink_socket_fd);
-
- // Add uplink_socket_fd into the poller.
uplink_handle = add_fd (uplink_socket_fd);
-
- // Add rdata_notify_fd into the poller.
rdata_notify_handle = add_fd (rdata_notify_fd);
// Set POLLIN. We wont never want to stop polling for uplink = we never
@@ -81,8 +79,6 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
// Set POLLOUT for downlink_socket_handle.
set_pollout (handle);
-
- inout = inout_;
}
void zmq::pgm_sender_t::unplug ()
@@ -91,7 +87,6 @@ void zmq::pgm_sender_t::unplug ()
rm_fd (uplink_handle);
rm_fd (rdata_notify_handle);
encoder.set_inout (NULL);
- inout = NULL;
}
void zmq::pgm_sender_t::revive ()
@@ -103,14 +98,14 @@ void zmq::pgm_sender_t::revive ()
zmq::pgm_sender_t::~pgm_sender_t ()
{
if (out_buffer) {
- pgm_socket.free_buffer (out_buffer);
+ free (out_buffer);
out_buffer = NULL;
}
}
-// In event on sender side means NAK or SPMR receiving from some peer.
void zmq::pgm_sender_t::in_event ()
{
+ // In event on sender side means NAK or SPMR receiving from some peer.
pgm_socket.process_upstream ();
}
@@ -118,55 +113,36 @@ void zmq::pgm_sender_t::out_event ()
{
// POLLOUT event from send socket. If write buffer is empty,
// try to read new data from the encoder.
- if (write_pos == write_size) {
-
- // Get buffer if we do not have already one.
- if (!out_buffer) {
- out_buffer = (unsigned char*)
- pgm_socket.get_buffer (&out_buffer_size);
- }
+ if (write_size == 0) {
- assert (out_buffer_size > 0);
-
- // First two bytes /sizeof (uint16_t)/ are used to store message
- // offset in following steps.
+ // First two bytes (sizeof uint16_t) are used to store message
+ // offset in following steps. Note that by passing our buffer to
+ // the get data function we prevent it from returning its own buffer.
unsigned char *bf = out_buffer + sizeof (uint16_t);
- write_size = out_buffer_size - sizeof (uint16_t);
- encoder.get_data (&bf, &write_size, &first_message_offset);
- write_pos = 0;
+ size_t bfsz = out_buffer_size - sizeof (uint16_t);
+ int offset = -1;
+ encoder.get_data (&bf, &bfsz, &offset);
// If there are no data to write stop polling for output.
- if (!write_size) {
+ if (!bfsz) {
reset_pollout (handle);
- } else {
- // Addning uint16_t for offset in a case when encoder returned > 0B.
- write_size += sizeof (uint16_t);
+ return;
}
- }
-
- // If there are any data to write, write them into the socket.
- // Note that all data has to written in one write_one_pkt_with_offset call.
- if (write_pos < write_size) {
- size_t nbytes = write_one_pkt_with_offset (out_buffer + write_pos,
- write_size - write_pos, (uint16_t) first_message_offset);
-
- // We can write either all data or 0 which means rate limit reached.
- zmq_assert (write_size - write_pos == nbytes || nbytes == 0);
- write_pos += nbytes;
+ // Put offset information in the buffer.
+ write_size = bfsz + sizeof (uint16_t);
+ put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
}
-}
-size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_,
- size_t size_, uint16_t offset_)
-{
- // Put offset information in the buffer.
- put_uint16 (data_, offset_);
-
- // Send data.
- size_t nbytes = pgm_socket.send (data_, size_);
+ // Send the data.
+ size_t nbytes = pgm_socket.send (out_buffer, write_size);
- return nbytes;
+ // We can write either all data or 0 which means rate limit reached.
+ if (nbytes == write_size)
+ write_size = 0;
+ else
+ zmq_assert (nbytes == 0);
}
+
#endif