From 9cee8f9c3e22f1e880988271ab1c31c92827efde Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 2 Feb 2010 15:11:25 +0100 Subject: problem with PGM messages larger than 1 MTU fixed --- src/pgm_receiver.cpp | 104 ++++++++++++++++++++++++++------------------------- 1 file changed, 54 insertions(+), 50 deletions(-) diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index b611324..d0310cc 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -93,72 +93,76 @@ void zmq::pgm_receiver_t::in_event () // Read data from the underlying pgm_socket. unsigned char *data = NULL; const pgm_tsi_t *tsi = NULL; - ssize_t received = pgm_socket.receive ((void**) &data, &tsi); - // No data to process. This may happen if the packet received is - // neither ODATA nor ODATA. - if (received == 0) - return; - - // Find the peer based on its TSI. - peers_t::iterator it = peers.find (*tsi); - - // Data loss. Delete decoder and mark the peer as disjoint. - if (received == -1) { - zmq_assert (it != peers.end ()); - it->second.joined = false; - if (it->second.decoder != NULL) { - delete it->second.decoder; - it->second.decoder = NULL; + // TODO: This loop can effectively block other engines in the same I/O + // thread in the case of high load. + while (true) { + + // Get new batch of data. + ssize_t received = pgm_socket.receive ((void**) &data, &tsi); + + // No data to process. This may happen if the packet received is + // neither ODATA nor ODATA. + if (received == 0) + break; + + // Find the peer based on its TSI. + peers_t::iterator it = peers.find (*tsi); + + // Data loss. Delete decoder and mark the peer as disjoint. + if (received == -1) { + zmq_assert (it != peers.end ()); + it->second.joined = false; + if (it->second.decoder != NULL) { + delete it->second.decoder; + it->second.decoder = NULL; + } + break; } - return; - } - // New peer. Add it to the list of know but unjoint peers. - if (it == peers.end ()) { - peer_info_t peer_info = {false, NULL}; - it = peers.insert (std::make_pair (*tsi, peer_info)).first; - } + // New peer. Add it to the list of know but unjoint peers. + if (it == peers.end ()) { + peer_info_t peer_info = {false, NULL}; + it = peers.insert (std::make_pair (*tsi, peer_info)).first; + } - // Read the offset of the fist message in the current packet. - zmq_assert ((size_t) received >= sizeof (uint16_t)); - uint16_t offset = get_uint16 (data); - data += sizeof (uint16_t); - received -= sizeof (uint16_t); + // Read the offset of the fist message in the current packet. + zmq_assert ((size_t) received >= sizeof (uint16_t)); + uint16_t offset = get_uint16 (data); + data += sizeof (uint16_t); + received -= sizeof (uint16_t); - // Join the stream if needed. - if (!it->second.joined) { + // Join the stream if needed. + if (!it->second.joined) { - // There is no beginning of the message in current packet. - // Ignore the data. - if (offset == 0xffff) - return; + // There is no beginning of the message in current packet. + // Ignore the data. + if (offset == 0xffff) + continue; - zmq_assert (offset <= received); - zmq_assert (it->second.decoder == NULL); + zmq_assert (offset <= received); + zmq_assert (it->second.decoder == NULL); - // We have to move data to the begining of the first message. - data += offset; - received -= offset; + // We have to move data to the begining of the first message. + data += offset; + received -= offset; - // Mark the stream as joined. - it->second.joined = true; + // Mark the stream as joined. + it->second.joined = true; - // Create and connect decoder for the peer. - it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); - it->second.decoder->set_inout (inout); - } + // Create and connect decoder for the peer. + it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); + it->second.decoder->set_inout (inout); + } - if (received) { - // Push all the data to the decoder. // TODO: process_buffer may not process entire buffer! ssize_t processed = it->second.decoder->process_buffer (data, received); zmq_assert (processed == received); - - // Flush any messages decoder may have produced. - inout->flush (); } + + // Flush any messages decoder may have produced. + inout->flush (); } #endif -- cgit v1.2.3