summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/pgm_receiver.cpp104
1 files changed, 54 insertions, 50 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index b611324..d0310cc 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -93,72 +93,76 @@ void zmq::pgm_receiver_t::in_event ()
// Read data from the underlying pgm_socket.
unsigned char *data = NULL;
const pgm_tsi_t *tsi = NULL;
- ssize_t received = pgm_socket.receive ((void**) &data, &tsi);
- // No data to process. This may happen if the packet received is
- // neither ODATA nor ODATA.
- if (received == 0)
- return;
-
- // Find the peer based on its TSI.
- peers_t::iterator it = peers.find (*tsi);
-
- // Data loss. Delete decoder and mark the peer as disjoint.
- if (received == -1) {
- zmq_assert (it != peers.end ());
- it->second.joined = false;
- if (it->second.decoder != NULL) {
- delete it->second.decoder;
- it->second.decoder = NULL;
+ // TODO: This loop can effectively block other engines in the same I/O
+ // thread in the case of high load.
+ while (true) {
+
+ // Get new batch of data.
+ ssize_t received = pgm_socket.receive ((void**) &data, &tsi);
+
+ // No data to process. This may happen if the packet received is
+ // neither ODATA nor ODATA.
+ if (received == 0)
+ break;
+
+ // Find the peer based on its TSI.
+ peers_t::iterator it = peers.find (*tsi);
+
+ // Data loss. Delete decoder and mark the peer as disjoint.
+ if (received == -1) {
+ zmq_assert (it != peers.end ());
+ it->second.joined = false;
+ if (it->second.decoder != NULL) {
+ delete it->second.decoder;
+ it->second.decoder = NULL;
+ }
+ break;
}
- return;
- }
- // New peer. Add it to the list of know but unjoint peers.
- if (it == peers.end ()) {
- peer_info_t peer_info = {false, NULL};
- it = peers.insert (std::make_pair (*tsi, peer_info)).first;
- }
+ // New peer. Add it to the list of know but unjoint peers.
+ if (it == peers.end ()) {
+ peer_info_t peer_info = {false, NULL};
+ it = peers.insert (std::make_pair (*tsi, peer_info)).first;
+ }
- // Read the offset of the fist message in the current packet.
- zmq_assert ((size_t) received >= sizeof (uint16_t));
- uint16_t offset = get_uint16 (data);
- data += sizeof (uint16_t);
- received -= sizeof (uint16_t);
+ // Read the offset of the fist message in the current packet.
+ zmq_assert ((size_t) received >= sizeof (uint16_t));
+ uint16_t offset = get_uint16 (data);
+ data += sizeof (uint16_t);
+ received -= sizeof (uint16_t);
- // Join the stream if needed.
- if (!it->second.joined) {
+ // Join the stream if needed.
+ if (!it->second.joined) {
- // There is no beginning of the message in current packet.
- // Ignore the data.
- if (offset == 0xffff)
- return;
+ // There is no beginning of the message in current packet.
+ // Ignore the data.
+ if (offset == 0xffff)
+ continue;
- zmq_assert (offset <= received);
- zmq_assert (it->second.decoder == NULL);
+ zmq_assert (offset <= received);
+ zmq_assert (it->second.decoder == NULL);
- // We have to move data to the begining of the first message.
- data += offset;
- received -= offset;
+ // We have to move data to the begining of the first message.
+ data += offset;
+ received -= offset;
- // Mark the stream as joined.
- it->second.joined = true;
+ // Mark the stream as joined.
+ it->second.joined = true;
- // Create and connect decoder for the peer.
- it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0);
- it->second.decoder->set_inout (inout);
- }
+ // Create and connect decoder for the peer.
+ it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0);
+ it->second.decoder->set_inout (inout);
+ }
- if (received) {
-
// Push all the data to the decoder.
// TODO: process_buffer may not process entire buffer!
ssize_t processed = it->second.decoder->process_buffer (data, received);
zmq_assert (processed == received);
-
- // Flush any messages decoder may have produced.
- inout->flush ();
}
+
+ // Flush any messages decoder may have produced.
+ inout->flush ();
}
#endif