summaryrefslogtreecommitdiff
path: root/src/pipe.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-28 16:51:46 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-28 16:51:46 +0200
commitcb09c6951e2c4405318b422a1f9213af3e4b6b8a (patch)
treefb5d4dfd6a71745e885b2501f19cfbbb38c6f441 /src/pipe.cpp
parent2dd501651592baa7f9e49f52e1321ae2b9b4e126 (diff)
pipe deallocation added
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r--src/pipe.cpp81
1 files changed, 78 insertions, 3 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 5016631..3748ae9 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -19,6 +19,8 @@
#include <pthread.h>
+#include <../include/zmq.h>
+
#include "pipe.hpp"
zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
@@ -39,9 +41,21 @@ zmq::reader_t::~reader_t ()
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
- return pipe->read (msg_);
+ if (!pipe->read (msg_))
+ return false;
+
+ // If delimiter was read, start termination process of the pipe.
+ unsigned char *offset = 0;
+ if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
+ if (endpoint)
+ endpoint->detach_inpipe (this);
+ term ();
+ return false;
+ }
// TODO: Adjust the size of the pipe.
+
+ return true;
}
void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
@@ -59,19 +73,48 @@ int zmq::reader_t::get_index ()
return index;
}
+void zmq::reader_t::term ()
+{
+ endpoint = NULL;
+ send_pipe_term (peer);
+}
+
void zmq::reader_t::process_revive ()
{
endpoint->revive (this);
}
+void zmq::reader_t::process_pipe_term_ack ()
+{
+ peer = NULL;
+ delete pipe;
+}
+
zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
pipe (pipe_),
peer (&pipe_->reader),
hwm (hwm_),
- lwm (lwm_)
+ lwm (lwm_),
+ index (-1),
+ endpoint (NULL)
+{
+}
+
+void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
+{
+ endpoint = endpoint_;
+}
+
+void zmq::writer_t::set_index (int index_)
+{
+ index = index_;
+}
+
+int zmq::writer_t::get_index ()
{
+ return index;
}
zmq::writer_t::~writer_t ()
@@ -99,14 +142,46 @@ void zmq::writer_t::flush ()
send_revive (peer);
}
+void zmq::writer_t::term ()
+{
+ endpoint = NULL;
+
+ // Push delimiter into the pipe.
+ // Trick the compiler to belive that the tag is a valid pointer.
+ zmq_msg_t msg;
+ const unsigned char *offset = 0;
+ msg.content = (void*) (offset + ZMQ_DELIMITER);
+ msg.shared = false;
+ pipe->write (msg);
+ pipe->flush ();
+}
+
+void zmq::writer_t::process_pipe_term ()
+{
+ if (endpoint)
+ endpoint->detach_outpipe (this);
+
+ reader_t *p = peer;
+ peer = NULL;
+ send_pipe_term_ack (p);
+}
+
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_) :
reader (reader_parent_, this, hwm_, lwm_),
writer (writer_parent_, this, hwm_, lwm_)
{
+ reader.register_pipe (this);
}
zmq::pipe_t::~pipe_t ()
{
+ // Deallocate all the unread messages in the pipe. We have to do it by
+ // hand because zmq_msg_t is a POD, not a class, so there's no associated
+ // destructor.
+ zmq_msg_t msg;
+ while (read (&msg))
+ zmq_msg_close (&msg);
+
+ reader.unregister_pipe (this);
}
-