diff options
-rw-r--r-- | src/pgm_receiver.cpp | 25 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 2 | ||||
-rw-r--r-- | src/zmq_decoder.cpp | 10 | ||||
-rw-r--r-- | src/zmq_decoder.hpp | 2 |
4 files changed, 16 insertions, 23 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index f34ecf0..6ea310c 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -46,6 +46,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, const options_t &options_, const char *session_name_) : io_object_t (parent_), + decoder (NULL), pgm_socket (true, options_), options (options_), session_name (session_name_), @@ -56,10 +57,15 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, zmq::pgm_receiver_t::~pgm_receiver_t () { + if (decoder) + delete decoder; } int zmq::pgm_receiver_t::init (const char *network_) { + decoder = new zmq_decoder_t; + zmq_assert (decoder); + return pgm_socket.init (network_); } @@ -69,7 +75,7 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_) int socket_fd; int waiting_pipe_fd; - decoder.set_inout (inout_); + decoder->set_inout (inout_); // Fill socket_fd and waiting_pipe_fd from PGM transport pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd); @@ -91,7 +97,7 @@ void zmq::pgm_receiver_t::unplug () { rm_fd (socket_handle); rm_fd (pipe_handle); - decoder.set_inout (NULL); + decoder->set_inout (NULL); inout = NULL; } @@ -105,9 +111,14 @@ void zmq::pgm_receiver_t::reconnect () // Save inout ptr. i_inout *inout_tmp = inout; + // PGM receiver is not joined anymore. + joined = false; + // Unplug - plug PGM transport. unplug (); - decoder.reset (); + delete decoder; + decoder = new zmq_decoder_t; + zmq_assert (decoder); plug (inout_tmp); } @@ -121,7 +132,7 @@ void zmq::pgm_receiver_t::in_event () while ((nbytes = receive_with_offset (&data_with_offset)) > 0) { // Push all the data to the decoder. - decoder.write ((unsigned char*)data_with_offset, nbytes); + decoder->write ((unsigned char*)data_with_offset, nbytes); } // Flush any messages decoder may have produced to the dispatcher. @@ -130,12 +141,6 @@ void zmq::pgm_receiver_t::in_event () // Data loss detected. if (nbytes == -1) { - // Throw message in progress from decoder - decoder.reset (); - - // PGM receive is not joined anymore. - joined = false; - // Recreate PGM transport. reconnect (); } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index ce9fa1a..53d5340 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -64,7 +64,7 @@ namespace zmq ssize_t receive_with_offset (void **data_); // Message decoder. - zmq_decoder_t decoder; + zmq_decoder_t *decoder; // PGM socket. pgm_socket_t pgm_socket; diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index ccaad0d..53811a1 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -36,16 +36,6 @@ zmq::zmq_decoder_t::~zmq_decoder_t () zmq_msg_close (&in_progress); } -void zmq::zmq_decoder_t::reset () -{ - // Free and reinit message buffer. - zmq_msg_close (&in_progress); - zmq_msg_init (&in_progress); - - // Restart the FSM. - next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); -} - void zmq::zmq_decoder_t::set_inout (i_inout *destination_) { destination = destination_; diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index 6bc373b..16b5312 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -37,8 +37,6 @@ namespace zmq void set_inout (struct i_inout *destination_); - // Clears any partially decoded messages. - void reset (); private: bool one_byte_size_ready (); |