diff options
-rw-r--r-- | src/pgm_receiver.cpp | 50 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 9 |
2 files changed, 55 insertions, 4 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 5480030..286fcc5 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -38,7 +38,9 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, io_object_t (parent_), pgm_socket (true, options_), options (options_), - inout (NULL) + inout (NULL), + mru_decoder (NULL), + pending_bytes (0) { } @@ -76,6 +78,9 @@ void zmq::pgm_receiver_t::unplug () } peers.clear (); + mru_decoder = NULL; + pending_bytes = 0; + // Stop polling. rm_fd (socket_handle); rm_fd (pipe_handle); @@ -90,7 +95,30 @@ void zmq::pgm_receiver_t::revive () void zmq::pgm_receiver_t::resume_input () { - zmq_not_implemented (); + // It is possible that the most recently used decoder + // processed the whole buffer but failed to write + // the last message into the pipe. + if (pending_bytes == 0) { + if (mru_decoder != NULL) + mru_decoder->process_buffer (NULL, 0); + return; + } + + zmq_assert (mru_decoder != NULL); + zmq_assert (pending_ptr != NULL); + + // Ask the decoder to process remaining data. + size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes); + pending_bytes -= n; + + if (pending_bytes > 0) + return; + + // Resume polling. + set_pollin (pipe_handle); + set_pollin (socket_handle); + + in_event (); } void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) @@ -111,6 +139,8 @@ void zmq::pgm_receiver_t::in_event () unsigned char *data = NULL; const pgm_tsi_t *tsi = NULL; + zmq_assert (pending_bytes == 0); + // TODO: This loop can effectively block other engines in the same I/O // thread in the case of high load. while (true) { @@ -130,6 +160,8 @@ void zmq::pgm_receiver_t::in_event () if (received == -1) { zmq_assert (it != peers.end ()); it->second.joined = false; + if (it->second.decoder == mru_decoder) + mru_decoder = NULL; if (it->second.decoder != NULL) { delete it->second.decoder; it->second.decoder = NULL; @@ -172,10 +204,20 @@ void zmq::pgm_receiver_t::in_event () it->second.decoder->set_inout (inout); } + mru_decoder = it->second.decoder; + // Push all the data to the decoder. - // TODO: process_buffer may not process entire buffer! ssize_t processed = it->second.decoder->process_buffer (data, received); - zmq_assert (processed == received); + if (processed < received) { + // Save some state so we can resume the decoding process later. + pending_bytes = received - processed; + pending_ptr = data + processed; + // Stop polling. + reset_pollin (pipe_handle); + reset_pollin (socket_handle); + + break; + } } // Flush any messages decoder may have produced. diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index dd9402f..becdfce 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -98,6 +98,15 @@ namespace zmq // Parent session. i_inout *inout; + // Most recently used decoder. + zmq_decoder_t *mru_decoder; + + // Number of bytes not consumed by the decoder due to pipe overflow. + size_t pending_bytes; + + // Pointer to data still waiting to be processed by the decoder. + unsigned char *pending_ptr; + // Poll handle associated with PGM socket. handle_t socket_handle; |