summaryrefslogtreecommitdiff
path: root/src/pipe.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r--src/pipe.cpp32
1 files changed, 24 insertions, 8 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 0e15dce..31b9199 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -21,11 +21,11 @@
#include "pipe.hpp"
-zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
+zmq::reader_t::reader_t (object_t *parent_,
uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
- pipe (pipe_),
- peer (&pipe_->writer),
+ pipe (NULL),
+ peer (NULL),
hwm (hwm_),
lwm (lwm_),
endpoint (NULL)
@@ -36,6 +36,13 @@ zmq::reader_t::~reader_t ()
{
}
+void zmq::reader_t::set_pipe (pipe_t *pipe_)
+{
+ zmq_assert (!pipe);
+ pipe = pipe_;
+ peer = &pipe_->writer;
+}
+
bool zmq::reader_t::check_read ()
{
// Check if there's an item in the pipe.
@@ -94,11 +101,11 @@ void zmq::reader_t::process_pipe_term_ack ()
delete pipe;
}
-zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
+zmq::writer_t::writer_t (object_t *parent_,
uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
- pipe (pipe_),
- peer (&pipe_->reader),
+ pipe (NULL),
+ peer (NULL),
hwm (hwm_),
lwm (lwm_),
endpoint (NULL)
@@ -114,6 +121,13 @@ zmq::writer_t::~writer_t ()
{
}
+void zmq::writer_t::set_pipe (pipe_t *pipe_)
+{
+ zmq_assert (!pipe);
+ pipe = pipe_;
+ peer = &pipe_->reader;
+}
+
bool zmq::writer_t::check_write (uint64_t size_)
{
// TODO: Check whether hwm is exceeded.
@@ -161,9 +175,11 @@ void zmq::writer_t::process_pipe_term ()
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_) :
- reader (reader_parent_, this, hwm_, lwm_),
- writer (writer_parent_, this, hwm_, lwm_)
+ reader (reader_parent_, hwm_, lwm_),
+ writer (writer_parent_, hwm_, lwm_)
{
+ reader.set_pipe (this);
+ writer.set_pipe (this);
reader.register_pipe (this);
}