diff options
| -rw-r--r-- | src/pipe.cpp | 32 | ||||
| -rw-r--r-- | src/pipe.hpp | 6 | 
2 files changed, 28 insertions, 10 deletions
| diff --git a/src/pipe.cpp b/src/pipe.cpp index 0e15dce..31b9199 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -21,11 +21,11 @@  #include "pipe.hpp" -zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, +zmq::reader_t::reader_t (object_t *parent_,        uint64_t hwm_, uint64_t lwm_) :      object_t (parent_), -    pipe (pipe_), -    peer (&pipe_->writer), +    pipe (NULL), +    peer (NULL),      hwm (hwm_),      lwm (lwm_),      endpoint (NULL) @@ -36,6 +36,13 @@ zmq::reader_t::~reader_t ()  {  } +void zmq::reader_t::set_pipe (pipe_t *pipe_) +{ +    zmq_assert (!pipe); +    pipe = pipe_; +    peer = &pipe_->writer; +} +  bool zmq::reader_t::check_read ()  {      //  Check if there's an item in the pipe. @@ -94,11 +101,11 @@ void zmq::reader_t::process_pipe_term_ack ()      delete pipe;  } -zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, +zmq::writer_t::writer_t (object_t *parent_,        uint64_t hwm_, uint64_t lwm_) :      object_t (parent_), -    pipe (pipe_), -    peer (&pipe_->reader), +    pipe (NULL), +    peer (NULL),      hwm (hwm_),      lwm (lwm_),      endpoint (NULL) @@ -114,6 +121,13 @@ zmq::writer_t::~writer_t ()  {  } +void zmq::writer_t::set_pipe (pipe_t *pipe_) +{ +    zmq_assert (!pipe); +    pipe = pipe_; +    peer = &pipe_->reader; +} +  bool zmq::writer_t::check_write (uint64_t size_)  {      //  TODO: Check whether hwm is exceeded. @@ -161,9 +175,11 @@ void zmq::writer_t::process_pipe_term ()  zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,        uint64_t hwm_, uint64_t lwm_) : -    reader (reader_parent_, this, hwm_, lwm_), -    writer (writer_parent_, this, hwm_, lwm_) +    reader (reader_parent_, hwm_, lwm_), +    writer (writer_parent_, hwm_, lwm_)  { +    reader.set_pipe (this); +    writer.set_pipe (this);      reader.register_pipe (this);  } diff --git a/src/pipe.hpp b/src/pipe.hpp index ecbce7d..9083ccd 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -36,10 +36,11 @@ namespace zmq      {      public: -        reader_t (class object_t *parent_, class pipe_t *pipe_, +        reader_t (class object_t *parent_,              uint64_t hwm_, uint64_t lwm_);          ~reader_t (); +        void set_pipe (class pipe_t *pipe_);          void set_endpoint (i_endpoint *endpoint_);          //  Returns true if there is at least one message to read in the pipe. @@ -83,10 +84,11 @@ namespace zmq      {      public: -        writer_t (class object_t *parent_, class pipe_t *pipe_, +        writer_t (class object_t *parent_,              uint64_t hwm_, uint64_t lwm_);          ~writer_t (); +        void set_pipe (class pipe_t *pipe_);          void set_endpoint (i_endpoint *endpoint_);          //  Checks whether message with specified size can be written to the | 
