summaryrefslogtreecommitdiff
path: root/src/pgm_receiver.cpp
diff options
context:
space:
mode:
authormalosek <malosek@fastmq.com>2009-09-16 10:35:23 +0200
committermalosek <malosek@fastmq.com>2009-09-16 10:35:23 +0200
commit9fbdcc7940823634d82f51d2b124ccfbca6e9b17 (patch)
tree702bcac75f2d0181a13f2223c4b0faa886be9be3 /src/pgm_receiver.cpp
parent969522bbf55467b6f6e8113be28451d087060843 (diff)
removed reset method from zmq_decoder_t
Diffstat (limited to 'src/pgm_receiver.cpp')
-rw-r--r--src/pgm_receiver.cpp25
1 files changed, 15 insertions, 10 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index f34ecf0..6ea310c 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -46,6 +46,7 @@
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_, const char *session_name_) :
io_object_t (parent_),
+ decoder (NULL),
pgm_socket (true, options_),
options (options_),
session_name (session_name_),
@@ -56,10 +57,15 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
zmq::pgm_receiver_t::~pgm_receiver_t ()
{
+ if (decoder)
+ delete decoder;
}
int zmq::pgm_receiver_t::init (const char *network_)
{
+ decoder = new zmq_decoder_t;
+ zmq_assert (decoder);
+
return pgm_socket.init (network_);
}
@@ -69,7 +75,7 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
int socket_fd;
int waiting_pipe_fd;
- decoder.set_inout (inout_);
+ decoder->set_inout (inout_);
// Fill socket_fd and waiting_pipe_fd from PGM transport
pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
@@ -91,7 +97,7 @@ void zmq::pgm_receiver_t::unplug ()
{
rm_fd (socket_handle);
rm_fd (pipe_handle);
- decoder.set_inout (NULL);
+ decoder->set_inout (NULL);
inout = NULL;
}
@@ -105,9 +111,14 @@ void zmq::pgm_receiver_t::reconnect ()
// Save inout ptr.
i_inout *inout_tmp = inout;
+ // PGM receiver is not joined anymore.
+ joined = false;
+
// Unplug - plug PGM transport.
unplug ();
- decoder.reset ();
+ delete decoder;
+ decoder = new zmq_decoder_t;
+ zmq_assert (decoder);
plug (inout_tmp);
}
@@ -121,7 +132,7 @@ void zmq::pgm_receiver_t::in_event ()
while ((nbytes = receive_with_offset (&data_with_offset)) > 0) {
// Push all the data to the decoder.
- decoder.write ((unsigned char*)data_with_offset, nbytes);
+ decoder->write ((unsigned char*)data_with_offset, nbytes);
}
// Flush any messages decoder may have produced to the dispatcher.
@@ -130,12 +141,6 @@ void zmq::pgm_receiver_t::in_event ()
// Data loss detected.
if (nbytes == -1) {
- // Throw message in progress from decoder
- decoder.reset ();
-
- // PGM receive is not joined anymore.
- joined = false;
-
// Recreate PGM transport.
reconnect ();
}