summaryrefslogtreecommitdiff
path: root/src/pgm_receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pgm_receiver.cpp')
-rw-r--r--src/pgm_receiver.cpp148
1 files changed, 57 insertions, 91 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 1d4d695..0eb92d2 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -27,9 +27,6 @@
#include "windows.hpp"
#endif
-#include <pgm/pgm.h>
-#include <iostream>
-
#include "pgm_receiver.hpp"
#include "err.hpp"
#include "stdint.hpp"
@@ -58,20 +55,12 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
void zmq::pgm_receiver_t::plug (i_inout *inout_)
{
- // Allocate 2 fds one for socket second for waiting pipe.
+ // Retrieve PGM fds and start polling.
int socket_fd;
int waiting_pipe_fd;
-
- // Fill socket_fd and waiting_pipe_fd from PGM transport
pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
-
- // Add socket_fd into poller.
socket_handle = add_fd (socket_fd);
-
- // Add waiting_pipe_fd into poller.
pipe_handle = add_fd (waiting_pipe_fd);
-
- // Set POLLIN for both handlers.
set_pollin (pipe_handle);
set_pollin (socket_handle);
@@ -81,15 +70,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
void zmq::pgm_receiver_t::unplug ()
{
// Delete decoders.
- for (peer_t::iterator it = peers.begin (); it != peers.end (); it++) {
+ for (peers_t::iterator it = peers.begin (); it != peers.end (); it++) {
if (it->second.decoder != NULL)
delete it->second.decoder;
}
-
peers.clear ();
+ // Stop polling.
rm_fd (socket_handle);
rm_fd (pipe_handle);
+
inout = NULL;
}
@@ -98,101 +88,77 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-// POLLIN event from socket or waiting_pipe.
void zmq::pgm_receiver_t::in_event ()
{
- // Iterator to peers map.
- peer_t::iterator it;
-
- // Data from PGM socket.
- unsigned char *raw_data = NULL;
+ // Read data from the underlying pgm_socket.
+ unsigned char *data = NULL;
const pgm_tsi_t *tsi = NULL;
- ssize_t nbytes = 0;
-
- do {
-
- // Read data from underlying pgm_socket.
- nbytes = pgm_socket.receive ((void**) &raw_data, &tsi);
-
- // No ODATA or RDATA.
- if (!nbytes)
- break;
+ ssize_t received = pgm_socket.receive ((void**) &data, &tsi);
- // Fid TSI in peers list.
- it = peers.find (*tsi);
+ // No data to process. This may happen if the packet received is
+ // neither ODATA nor ODATA.
+ if (received == 0)
+ return;
- // Data loss.
- if (nbytes == -1) {
+ // Find the peer based on its TSI.
+ peers_t::iterator it = peers.find (*tsi);
- zmq_assert (it != peers.end ());
-
- // Delete decoder and set joined to false.
- it->second.joined = false;
-
- if (it->second.decoder != NULL) {
- delete it->second.decoder;
- it->second.decoder = NULL;
- }
-
- break;
+ // Data loss. Delete decoder and mark the peer as disjoint.
+ if (received == -1) {
+ zmq_assert (it != peers.end ());
+ it->second.joined = false;
+ if (it->second.decoder != NULL) {
+ delete it->second.decoder;
+ it->second.decoder = NULL;
}
+ return;
+ }
- // Read offset of the fist message in current APDU.
- zmq_assert ((size_t) nbytes >= sizeof (uint16_t));
- uint16_t apdu_offset = get_uint16 (raw_data);
+ // New peer. Add it to the list of know but unjoint peers.
+ if (it == peers.end ()) {
+ peer_info_t peer_info = {false, NULL};
+ it = peers.insert (std::make_pair (*tsi, peer_info)).first;
+ }
- // Shift raw_data & decrease nbytes by the first message offset
- // information (sizeof uint16_t).
- raw_data += sizeof (uint16_t);
- nbytes -= sizeof (uint16_t);
+ // Read the offset of the fist message in the current packet.
+ zmq_assert ((size_t) received >= sizeof (uint16_t));
+ uint16_t offset = get_uint16 (data);
+ data += sizeof (uint16_t);
+ received -= sizeof (uint16_t);
- // New peer.
- if (it == peers.end ()) {
- peer_info_t peer_info = {false, NULL};
- it = peers.insert (std::make_pair (*tsi, peer_info)).first;
- }
+ // Join the stream if needed.
+ if (!it->second.joined) {
- // There is not beginning of the message in current APDU and we
- // are not joined jet -> throwing data.
- if (apdu_offset == 0xFFFF && !it->second.joined) {
- break;
- }
+ // There is no beginning of the message in current packet.
+ // Ignore the data.
+ if (offset == 0xffff)
+ return;
- // Now is the possibility to join the stream.
- if (!it->second.joined) {
-
- zmq_assert (apdu_offset <= nbytes);
- zmq_assert (it->second.decoder == NULL);
+ zmq_assert (offset <= received);
+ zmq_assert (it->second.decoder == NULL);
- // We have to move data to the begining of the first message.
- raw_data += apdu_offset;
- nbytes -= apdu_offset;
+ // We have to move data to the begining of the first message.
+ data += offset;
+ received -= offset;
- // Joined the stream.
- it->second.joined = true;
+ // Mark the stream as joined.
+ it->second.joined = true;
- // Create and connect decoder for joined peer.
- it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0);
- it->second.decoder->set_inout (inout);
- }
-
- if (nbytes > 0) {
-
- // Push all the data to the decoder.
- // TODO: process_buffer may not process entire buffer!
- it->second.decoder->process_buffer (raw_data, nbytes);
- }
+ // Create and connect decoder for the peer.
+ it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0);
+ it->second.decoder->set_inout (inout);
+ }
- } while (nbytes > 0);
+ if (received) {
- // Flush any messages decoder may have produced to the dispatcher.
- inout->flush ();
-
-}
+ // Push all the data to the decoder.
+ // TODO: process_buffer may not process entire buffer!
+ size_t processed = it->second.decoder->process_buffer (data, received);
+ zmq_assert (processed == received);
-void zmq::pgm_receiver_t::out_event ()
-{
- zmq_assert (false);
+ // Flush any messages decoder may have produced.
+ inout->flush ();
+ }
}
#endif