From 5852db451a76905336601c5ba3e4f33006f007fb Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 28 Dec 2009 11:51:06 +0100 Subject: PGM code cleanup --- src/pgm_receiver.cpp | 148 ++++++++++++++++++++------------------------------- 1 file changed, 57 insertions(+), 91 deletions(-) (limited to 'src/pgm_receiver.cpp') diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 1d4d695..0eb92d2 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -27,9 +27,6 @@ #include "windows.hpp" #endif -#include -#include - #include "pgm_receiver.hpp" #include "err.hpp" #include "stdint.hpp" @@ -58,20 +55,12 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) void zmq::pgm_receiver_t::plug (i_inout *inout_) { - // Allocate 2 fds one for socket second for waiting pipe. + // Retrieve PGM fds and start polling. int socket_fd; int waiting_pipe_fd; - - // Fill socket_fd and waiting_pipe_fd from PGM transport pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd); - - // Add socket_fd into poller. socket_handle = add_fd (socket_fd); - - // Add waiting_pipe_fd into poller. pipe_handle = add_fd (waiting_pipe_fd); - - // Set POLLIN for both handlers. set_pollin (pipe_handle); set_pollin (socket_handle); @@ -81,15 +70,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_) void zmq::pgm_receiver_t::unplug () { // Delete decoders. - for (peer_t::iterator it = peers.begin (); it != peers.end (); it++) { + for (peers_t::iterator it = peers.begin (); it != peers.end (); it++) { if (it->second.decoder != NULL) delete it->second.decoder; } - peers.clear (); + // Stop polling. rm_fd (socket_handle); rm_fd (pipe_handle); + inout = NULL; } @@ -98,101 +88,77 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } -// POLLIN event from socket or waiting_pipe. void zmq::pgm_receiver_t::in_event () { - // Iterator to peers map. - peer_t::iterator it; - - // Data from PGM socket. - unsigned char *raw_data = NULL; + // Read data from the underlying pgm_socket. + unsigned char *data = NULL; const pgm_tsi_t *tsi = NULL; - ssize_t nbytes = 0; - - do { - - // Read data from underlying pgm_socket. - nbytes = pgm_socket.receive ((void**) &raw_data, &tsi); - - // No ODATA or RDATA. - if (!nbytes) - break; + ssize_t received = pgm_socket.receive ((void**) &data, &tsi); - // Fid TSI in peers list. - it = peers.find (*tsi); + // No data to process. This may happen if the packet received is + // neither ODATA nor ODATA. + if (received == 0) + return; - // Data loss. - if (nbytes == -1) { + // Find the peer based on its TSI. + peers_t::iterator it = peers.find (*tsi); - zmq_assert (it != peers.end ()); - - // Delete decoder and set joined to false. - it->second.joined = false; - - if (it->second.decoder != NULL) { - delete it->second.decoder; - it->second.decoder = NULL; - } - - break; + // Data loss. Delete decoder and mark the peer as disjoint. + if (received == -1) { + zmq_assert (it != peers.end ()); + it->second.joined = false; + if (it->second.decoder != NULL) { + delete it->second.decoder; + it->second.decoder = NULL; } + return; + } - // Read offset of the fist message in current APDU. - zmq_assert ((size_t) nbytes >= sizeof (uint16_t)); - uint16_t apdu_offset = get_uint16 (raw_data); + // New peer. Add it to the list of know but unjoint peers. + if (it == peers.end ()) { + peer_info_t peer_info = {false, NULL}; + it = peers.insert (std::make_pair (*tsi, peer_info)).first; + } - // Shift raw_data & decrease nbytes by the first message offset - // information (sizeof uint16_t). - raw_data += sizeof (uint16_t); - nbytes -= sizeof (uint16_t); + // Read the offset of the fist message in the current packet. + zmq_assert ((size_t) received >= sizeof (uint16_t)); + uint16_t offset = get_uint16 (data); + data += sizeof (uint16_t); + received -= sizeof (uint16_t); - // New peer. - if (it == peers.end ()) { - peer_info_t peer_info = {false, NULL}; - it = peers.insert (std::make_pair (*tsi, peer_info)).first; - } + // Join the stream if needed. + if (!it->second.joined) { - // There is not beginning of the message in current APDU and we - // are not joined jet -> throwing data. - if (apdu_offset == 0xFFFF && !it->second.joined) { - break; - } + // There is no beginning of the message in current packet. + // Ignore the data. + if (offset == 0xffff) + return; - // Now is the possibility to join the stream. - if (!it->second.joined) { - - zmq_assert (apdu_offset <= nbytes); - zmq_assert (it->second.decoder == NULL); + zmq_assert (offset <= received); + zmq_assert (it->second.decoder == NULL); - // We have to move data to the begining of the first message. - raw_data += apdu_offset; - nbytes -= apdu_offset; + // We have to move data to the begining of the first message. + data += offset; + received -= offset; - // Joined the stream. - it->second.joined = true; + // Mark the stream as joined. + it->second.joined = true; - // Create and connect decoder for joined peer. - it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); - it->second.decoder->set_inout (inout); - } - - if (nbytes > 0) { - - // Push all the data to the decoder. - // TODO: process_buffer may not process entire buffer! - it->second.decoder->process_buffer (raw_data, nbytes); - } + // Create and connect decoder for the peer. + it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); + it->second.decoder->set_inout (inout); + } - } while (nbytes > 0); + if (received) { - // Flush any messages decoder may have produced to the dispatcher. - inout->flush (); - -} + // Push all the data to the decoder. + // TODO: process_buffer may not process entire buffer! + size_t processed = it->second.decoder->process_buffer (data, received); + zmq_assert (processed == received); -void zmq::pgm_receiver_t::out_event () -{ - zmq_assert (false); + // Flush any messages decoder may have produced. + inout->flush (); + } } #endif -- cgit v1.2.3