From cb09c6951e2c4405318b422a1f9213af3e4b6b8a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 28 Aug 2009 16:51:46 +0200 Subject: pipe deallocation added --- src/pipe.cpp | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 78 insertions(+), 3 deletions(-) (limited to 'src/pipe.cpp') diff --git a/src/pipe.cpp b/src/pipe.cpp index 5016631..3748ae9 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -19,6 +19,8 @@ #include +#include <../include/zmq.h> + #include "pipe.hpp" zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, @@ -39,9 +41,21 @@ zmq::reader_t::~reader_t () bool zmq::reader_t::read (zmq_msg_t *msg_) { - return pipe->read (msg_); + if (!pipe->read (msg_)) + return false; + + // If delimiter was read, start termination process of the pipe. + unsigned char *offset = 0; + if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { + if (endpoint) + endpoint->detach_inpipe (this); + term (); + return false; + } // TODO: Adjust the size of the pipe. + + return true; } void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) @@ -59,19 +73,48 @@ int zmq::reader_t::get_index () return index; } +void zmq::reader_t::term () +{ + endpoint = NULL; + send_pipe_term (peer); +} + void zmq::reader_t::process_revive () { endpoint->revive (this); } +void zmq::reader_t::process_pipe_term_ack () +{ + peer = NULL; + delete pipe; +} + zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, uint64_t hwm_, uint64_t lwm_) : object_t (parent_), pipe (pipe_), peer (&pipe_->reader), hwm (hwm_), - lwm (lwm_) + lwm (lwm_), + index (-1), + endpoint (NULL) +{ +} + +void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) +{ + endpoint = endpoint_; +} + +void zmq::writer_t::set_index (int index_) +{ + index = index_; +} + +int zmq::writer_t::get_index () { + return index; } zmq::writer_t::~writer_t () @@ -99,14 +142,46 @@ void zmq::writer_t::flush () send_revive (peer); } +void zmq::writer_t::term () +{ + endpoint = NULL; + + // Push delimiter into the pipe. + // Trick the compiler to belive that the tag is a valid pointer. + zmq_msg_t msg; + const unsigned char *offset = 0; + msg.content = (void*) (offset + ZMQ_DELIMITER); + msg.shared = false; + pipe->write (msg); + pipe->flush (); +} + +void zmq::writer_t::process_pipe_term () +{ + if (endpoint) + endpoint->detach_outpipe (this); + + reader_t *p = peer; + peer = NULL; + send_pipe_term_ack (p); +} + zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_) : reader (reader_parent_, this, hwm_, lwm_), writer (writer_parent_, this, hwm_, lwm_) { + reader.register_pipe (this); } zmq::pipe_t::~pipe_t () { + // Deallocate all the unread messages in the pipe. We have to do it by + // hand because zmq_msg_t is a POD, not a class, so there's no associated + // destructor. + zmq_msg_t msg; + while (read (&msg)) + zmq_msg_close (&msg); + + reader.unregister_pipe (this); } - -- cgit v1.2.3