summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Hurton <hurtonm@gmail.com>2010-03-02 09:02:40 +0100
committerMartin Hurton <hurtonm@gmail.com>2010-03-12 11:07:38 +0100
commitf9521c6b6a35103c03b742a311a34d7b04da0b84 (patch)
tree77d0bd5daa1a2b896c952fe3c20e0e17471bfee4 /src
parent61ee6fae536a8000be87b5aaf271f6519a3b7d3f (diff)
PGM: implement flow control
Diffstat (limited to 'src')
-rw-r--r--src/pgm_receiver.cpp50
-rw-r--r--src/pgm_receiver.hpp9
2 files changed, 55 insertions, 4 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 5480030..286fcc5 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -38,7 +38,9 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
io_object_t (parent_),
pgm_socket (true, options_),
options (options_),
- inout (NULL)
+ inout (NULL),
+ mru_decoder (NULL),
+ pending_bytes (0)
{
}
@@ -76,6 +78,9 @@ void zmq::pgm_receiver_t::unplug ()
}
peers.clear ();
+ mru_decoder = NULL;
+ pending_bytes = 0;
+
// Stop polling.
rm_fd (socket_handle);
rm_fd (pipe_handle);
@@ -90,7 +95,30 @@ void zmq::pgm_receiver_t::revive ()
void zmq::pgm_receiver_t::resume_input ()
{
- zmq_not_implemented ();
+ // It is possible that the most recently used decoder
+ // processed the whole buffer but failed to write
+ // the last message into the pipe.
+ if (pending_bytes == 0) {
+ if (mru_decoder != NULL)
+ mru_decoder->process_buffer (NULL, 0);
+ return;
+ }
+
+ zmq_assert (mru_decoder != NULL);
+ zmq_assert (pending_ptr != NULL);
+
+ // Ask the decoder to process remaining data.
+ size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes);
+ pending_bytes -= n;
+
+ if (pending_bytes > 0)
+ return;
+
+ // Resume polling.
+ set_pollin (pipe_handle);
+ set_pollin (socket_handle);
+
+ in_event ();
}
void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
@@ -111,6 +139,8 @@ void zmq::pgm_receiver_t::in_event ()
unsigned char *data = NULL;
const pgm_tsi_t *tsi = NULL;
+ zmq_assert (pending_bytes == 0);
+
// TODO: This loop can effectively block other engines in the same I/O
// thread in the case of high load.
while (true) {
@@ -130,6 +160,8 @@ void zmq::pgm_receiver_t::in_event ()
if (received == -1) {
zmq_assert (it != peers.end ());
it->second.joined = false;
+ if (it->second.decoder == mru_decoder)
+ mru_decoder = NULL;
if (it->second.decoder != NULL) {
delete it->second.decoder;
it->second.decoder = NULL;
@@ -172,10 +204,20 @@ void zmq::pgm_receiver_t::in_event ()
it->second.decoder->set_inout (inout);
}
+ mru_decoder = it->second.decoder;
+
// Push all the data to the decoder.
- // TODO: process_buffer may not process entire buffer!
ssize_t processed = it->second.decoder->process_buffer (data, received);
- zmq_assert (processed == received);
+ if (processed < received) {
+ // Save some state so we can resume the decoding process later.
+ pending_bytes = received - processed;
+ pending_ptr = data + processed;
+ // Stop polling.
+ reset_pollin (pipe_handle);
+ reset_pollin (socket_handle);
+
+ break;
+ }
}
// Flush any messages decoder may have produced.
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index dd9402f..becdfce 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -98,6 +98,15 @@ namespace zmq
// Parent session.
i_inout *inout;
+ // Most recently used decoder.
+ zmq_decoder_t *mru_decoder;
+
+ // Number of bytes not consumed by the decoder due to pipe overflow.
+ size_t pending_bytes;
+
+ // Pointer to data still waiting to be processed by the decoder.
+ unsigned char *pending_ptr;
+
// Poll handle associated with PGM socket.
handle_t socket_handle;