summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-11-26 12:01:26 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-11-26 12:01:26 +0100
commit8d85638f77ec0aa886170ba6bb49763ef165393b (patch)
tree438957d3dac3fb738c26426904817377025d0094
parent92aa9e94e21b652839faa3dda27c67571bad315d (diff)
memory leak in message encoder fixed
-rw-r--r--AUTHORS1
-rw-r--r--perf/c/local_thr.c3
-rw-r--r--src/session.cpp4
-rw-r--r--src/zmq_decoder.cpp8
-rw-r--r--src/zmq_encoder.cpp7
-rw-r--r--src/zmq_listener_init.cpp1
6 files changed, 17 insertions, 7 deletions
diff --git a/AUTHORS b/AUTHORS
index 13f0076..b37bffa 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -9,6 +9,7 @@ Dirk O. Kaar
Erich Heine
Frank Denis
George Neill
+Jon Dyte
Martin Hurton
Martin Lucina
Martin Sustrik
diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c
index c97af11..f785e80 100644
--- a/perf/c/local_thr.c
+++ b/perf/c/local_thr.c
@@ -79,6 +79,9 @@ int main (int argc, char *argv [])
if (elapsed == 0)
elapsed = 1;
+ rc = zmq_msg_close (&msg);
+ assert (rc == 0);
+
throughput = (unsigned long)
((double) message_count / (double) elapsed * 1000000);
megabits = (double) (throughput * message_size * 8) / 1000000;
diff --git a/src/session.cpp b/src/session.cpp
index f62de27..87b47b0 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
- // The communication is unidirectional.
- // We don't expect any message to arrive.
- zmq_assert (out_pipe);
-
if (out_pipe->write (msg_)) {
zmq_msg_init (msg_);
return true;
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index 53811a1..8040f21 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -51,6 +51,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
else {
// TODO: Handle over-sized message decently.
+ // in_progress is initialised at this point so in theory we should
+ // close it before calling zmq_msg_init_size, however, it's a 0-byte
+ // message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
errno_assert (rc == 0);
@@ -67,6 +70,9 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
size_t size = (size_t) get_uint64 (tmpbuf);
// TODO: Handle over-sized message decently.
+ // in_progress is initialised at this point so in theory we should
+ // close it before calling zmq_msg_init_size, however, it's a 0-byte
+ // message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, size);
errno_assert (rc == 0);
@@ -78,7 +84,7 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
bool zmq::zmq_decoder_t::message_ready ()
{
// Message is completely read. Push it further and start reading
- // new message.
+ // new message. (in_progress is a 0-byte message after this point.)
if (!destination || !destination->write (&in_progress))
return false;
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 44b919b..180bda7 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -50,12 +50,17 @@ bool zmq::zmq_encoder_t::size_ready ()
bool zmq::zmq_encoder_t::message_ready ()
{
+ // Destroy content of the old message.
+ zmq_msg_close(&in_progress);
+
// Read new message from the dispatcher. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
- if (!source || !source->read (&in_progress))
+ if (!source || !source->read (&in_progress)) {
+ zmq_msg_init (&in_progress);
return false;
+ }
size_t size = zmq_msg_size (&in_progress);
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index eec41c7..0d9488d 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -55,7 +55,6 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
has_peer_identity = true;
peer_identity.assign ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
-
return true;
}