From 9cee8f9c3e22f1e880988271ab1c31c92827efde Mon Sep 17 00:00:00 2001
From: Martin Sustrik <sustrik@250bpm.com>
Date: Tue, 2 Feb 2010 15:11:25 +0100
Subject: problem with PGM messages larger than 1 MTU fixed

---
 src/pgm_receiver.cpp | 104 ++++++++++++++++++++++++++-------------------------
 1 file changed, 54 insertions(+), 50 deletions(-)

diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index b611324..d0310cc 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -93,72 +93,76 @@ void zmq::pgm_receiver_t::in_event ()
     // Read data from the underlying pgm_socket.
     unsigned char *data = NULL;
     const pgm_tsi_t *tsi = NULL;
-    ssize_t received = pgm_socket.receive ((void**) &data, &tsi);
 
-    //  No data to process. This may happen if the packet received is
-    //  neither ODATA nor ODATA.
-    if (received == 0)
-        return;
-
-    //  Find the peer based on its TSI.
-    peers_t::iterator it = peers.find (*tsi);
-
-    //  Data loss. Delete decoder and mark the peer as disjoint.
-    if (received == -1) {
-        zmq_assert (it != peers.end ());
-        it->second.joined = false;
-        if (it->second.decoder != NULL) {
-            delete it->second.decoder;
-            it->second.decoder = NULL;
+    //  TODO: This loop can effectively block other engines in the same I/O
+    //  thread in the case of high load.
+    while (true) {
+
+        //  Get new batch of data.
+        ssize_t received = pgm_socket.receive ((void**) &data, &tsi);
+
+        //  No data to process. This may happen if the packet received is
+        //  neither ODATA nor ODATA.
+        if (received == 0)
+            break;
+
+        //  Find the peer based on its TSI.
+        peers_t::iterator it = peers.find (*tsi);
+
+        //  Data loss. Delete decoder and mark the peer as disjoint.
+        if (received == -1) {
+            zmq_assert (it != peers.end ());
+            it->second.joined = false;
+            if (it->second.decoder != NULL) {
+                delete it->second.decoder;
+                it->second.decoder = NULL;
+            }
+            break;
         }
-        return;
-    }
 
-    //  New peer. Add it to the list of know but unjoint peers.
-    if (it == peers.end ()) {
-        peer_info_t peer_info = {false, NULL};
-        it = peers.insert (std::make_pair (*tsi, peer_info)).first;
-    }
+        //  New peer. Add it to the list of know but unjoint peers.
+        if (it == peers.end ()) {
+            peer_info_t peer_info = {false, NULL};
+            it = peers.insert (std::make_pair (*tsi, peer_info)).first;
+        }
 
-    //  Read the offset of the fist message in the current packet.
-    zmq_assert ((size_t) received >= sizeof (uint16_t));
-    uint16_t offset = get_uint16 (data);
-    data += sizeof (uint16_t);
-    received -= sizeof (uint16_t);
+        //  Read the offset of the fist message in the current packet.
+        zmq_assert ((size_t) received >= sizeof (uint16_t));
+        uint16_t offset = get_uint16 (data);
+        data += sizeof (uint16_t);
+        received -= sizeof (uint16_t);
 
-    //  Join the stream if needed.
-    if (!it->second.joined) {
+        //  Join the stream if needed.
+        if (!it->second.joined) {
 
-        //  There is no beginning of the message in current packet.
-        //  Ignore the data.
-        if (offset == 0xffff)
-            return;
+            //  There is no beginning of the message in current packet.
+            //  Ignore the data.
+            if (offset == 0xffff)
+                continue;
 
-        zmq_assert (offset <= received);
-        zmq_assert (it->second.decoder == NULL);
+            zmq_assert (offset <= received);
+            zmq_assert (it->second.decoder == NULL);
 
-        //  We have to move data to the begining of the first message.
-        data += offset;
-        received -= offset;
+            //  We have to move data to the begining of the first message.
+            data += offset;
+            received -= offset;
 
-        //  Mark the stream as joined.
-        it->second.joined = true;
+            //  Mark the stream as joined.
+            it->second.joined = true;
 
-        //  Create and connect decoder for the peer.
-        it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0);
-        it->second.decoder->set_inout (inout);
-    }
+            //  Create and connect decoder for the peer.
+            it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0);
+            it->second.decoder->set_inout (inout);
+        }
 
-    if (received) {
-    
         //  Push all the data to the decoder.
         //  TODO: process_buffer may not process entire buffer!
         ssize_t processed = it->second.decoder->process_buffer (data, received);
         zmq_assert (processed == received);
-
-        //  Flush any messages decoder may have produced.
-        inout->flush ();
     }
+
+    //  Flush any messages decoder may have produced.
+    inout->flush ();
 }
 
 #endif
-- 
cgit v1.2.3