diff options
| -rw-r--r-- | src/pgm_receiver.cpp | 50 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 9 | 
2 files changed, 55 insertions, 4 deletions
| diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 5480030..286fcc5 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -38,7 +38,9 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,      io_object_t (parent_),      pgm_socket (true, options_),      options (options_), -    inout (NULL) +    inout (NULL), +    mru_decoder (NULL), +    pending_bytes (0)  {  } @@ -76,6 +78,9 @@ void zmq::pgm_receiver_t::unplug ()      }      peers.clear (); +    mru_decoder = NULL; +    pending_bytes = 0; +      //  Stop polling.      rm_fd (socket_handle);      rm_fd (pipe_handle); @@ -90,7 +95,30 @@ void zmq::pgm_receiver_t::revive ()  void zmq::pgm_receiver_t::resume_input ()  { -   zmq_not_implemented (); +    //  It is possible that the most recently used decoder +    //  processed the whole buffer but failed to write +    //  the last message into the pipe. +    if (pending_bytes == 0) { +        if (mru_decoder != NULL) +            mru_decoder->process_buffer (NULL, 0); +        return; +    } + +    zmq_assert (mru_decoder != NULL); +    zmq_assert (pending_ptr != NULL); + +    //  Ask the decoder to process remaining data. +    size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes); +    pending_bytes -= n; + +    if (pending_bytes > 0) +        return; + +    //  Resume polling. +    set_pollin (pipe_handle); +    set_pollin (socket_handle); + +    in_event ();  }  void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) @@ -111,6 +139,8 @@ void zmq::pgm_receiver_t::in_event ()      unsigned char *data = NULL;      const pgm_tsi_t *tsi = NULL; +    zmq_assert (pending_bytes == 0); +      //  TODO: This loop can effectively block other engines in the same I/O      //  thread in the case of high load.      while (true) { @@ -130,6 +160,8 @@ void zmq::pgm_receiver_t::in_event ()          if (received == -1) {              zmq_assert (it != peers.end ());              it->second.joined = false; +            if (it->second.decoder == mru_decoder) +                mru_decoder = NULL;              if (it->second.decoder != NULL) {                  delete it->second.decoder;                  it->second.decoder = NULL; @@ -172,10 +204,20 @@ void zmq::pgm_receiver_t::in_event ()              it->second.decoder->set_inout (inout);          } +        mru_decoder = it->second.decoder; +          //  Push all the data to the decoder. -        //  TODO: process_buffer may not process entire buffer!          ssize_t processed = it->second.decoder->process_buffer (data, received); -        zmq_assert (processed == received); +        if (processed < received) { +            //  Save some state so we can resume the decoding process later. +            pending_bytes = received - processed; +            pending_ptr = data + processed; +            //  Stop polling. +            reset_pollin (pipe_handle); +            reset_pollin (socket_handle); + +            break; +        }      }      //  Flush any messages decoder may have produced. diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index dd9402f..becdfce 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -98,6 +98,15 @@ namespace zmq          //  Parent session.          i_inout *inout; +        //  Most recently used decoder. +        zmq_decoder_t *mru_decoder; + +        //  Number of bytes not consumed by the decoder due to pipe overflow. +        size_t pending_bytes; + +        //  Pointer to data still waiting to be processed by the decoder. +        unsigned char *pending_ptr; +          //  Poll handle associated with PGM socket.          handle_t socket_handle; | 
