summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/pipe.cpp32
-rw-r--r--src/pipe.hpp6
2 files changed, 28 insertions, 10 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 0e15dce..31b9199 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -21,11 +21,11 @@
#include "pipe.hpp"
-zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
+zmq::reader_t::reader_t (object_t *parent_,
uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
- pipe (pipe_),
- peer (&pipe_->writer),
+ pipe (NULL),
+ peer (NULL),
hwm (hwm_),
lwm (lwm_),
endpoint (NULL)
@@ -36,6 +36,13 @@ zmq::reader_t::~reader_t ()
{
}
+void zmq::reader_t::set_pipe (pipe_t *pipe_)
+{
+ zmq_assert (!pipe);
+ pipe = pipe_;
+ peer = &pipe_->writer;
+}
+
bool zmq::reader_t::check_read ()
{
// Check if there's an item in the pipe.
@@ -94,11 +101,11 @@ void zmq::reader_t::process_pipe_term_ack ()
delete pipe;
}
-zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
+zmq::writer_t::writer_t (object_t *parent_,
uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
- pipe (pipe_),
- peer (&pipe_->reader),
+ pipe (NULL),
+ peer (NULL),
hwm (hwm_),
lwm (lwm_),
endpoint (NULL)
@@ -114,6 +121,13 @@ zmq::writer_t::~writer_t ()
{
}
+void zmq::writer_t::set_pipe (pipe_t *pipe_)
+{
+ zmq_assert (!pipe);
+ pipe = pipe_;
+ peer = &pipe_->reader;
+}
+
bool zmq::writer_t::check_write (uint64_t size_)
{
// TODO: Check whether hwm is exceeded.
@@ -161,9 +175,11 @@ void zmq::writer_t::process_pipe_term ()
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_) :
- reader (reader_parent_, this, hwm_, lwm_),
- writer (writer_parent_, this, hwm_, lwm_)
+ reader (reader_parent_, hwm_, lwm_),
+ writer (writer_parent_, hwm_, lwm_)
{
+ reader.set_pipe (this);
+ writer.set_pipe (this);
reader.register_pipe (this);
}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index ecbce7d..9083ccd 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -36,10 +36,11 @@ namespace zmq
{
public:
- reader_t (class object_t *parent_, class pipe_t *pipe_,
+ reader_t (class object_t *parent_,
uint64_t hwm_, uint64_t lwm_);
~reader_t ();
+ void set_pipe (class pipe_t *pipe_);
void set_endpoint (i_endpoint *endpoint_);
// Returns true if there is at least one message to read in the pipe.
@@ -83,10 +84,11 @@ namespace zmq
{
public:
- writer_t (class object_t *parent_, class pipe_t *pipe_,
+ writer_t (class object_t *parent_,
uint64_t hwm_, uint64_t lwm_);
~writer_t ();
+ void set_pipe (class pipe_t *pipe_);
void set_endpoint (i_endpoint *endpoint_);
// Checks whether message with specified size can be written to the