diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/p2p.cpp | 56 | ||||
| -rw-r--r-- | src/p2p.hpp | 5 | 
2 files changed, 49 insertions, 12 deletions
diff --git a/src/p2p.cpp b/src/p2p.cpp index ae2424a..9a5e186 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -21,9 +21,13 @@  #include "p2p.hpp"  #include "err.hpp" +#include "pipe.hpp"  zmq::p2p_t::p2p_t (class app_thread_t *parent_) : -    socket_base_t (parent_) +    socket_base_t (parent_), +    inpipe (NULL), +    outpipe (NULL), +    alive (true)  {      options.requires_in = true;      options.requires_out = true; @@ -31,32 +35,42 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) :  zmq::p2p_t::~p2p_t ()  { +    if (inpipe) +        inpipe->term (); +    if (outpipe) +        outpipe->term ();  }  void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,      class writer_t *outpipe_)  { -    zmq_assert (false); +    zmq_assert (!inpipe && !outpipe); +    inpipe = inpipe_; +    outpipe = outpipe_;  }  void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_)  { -    zmq_assert (false); +    zmq_assert (pipe_ == inpipe); +    inpipe = NULL;  }  void zmq::p2p_t::xdetach_outpipe (class writer_t *pipe_)  { -    zmq_assert (false); +    zmq_assert (pipe_ == outpipe); +    outpipe = NULL;  }  void zmq::p2p_t::xkill (class reader_t *pipe_)  { -    zmq_assert (false); +    zmq_assert (alive); +    alive = false;  }  void zmq::p2p_t::xrevive (class reader_t *pipe_)  { -    zmq_assert (false); +    zmq_assert (!alive); +    alive = true;  }  int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, @@ -68,31 +82,49 @@ int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,  int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)  { -    zmq_assert (false); +    if (!outpipe) { +        errno = EAGAIN; +        return -1; +    } + +    //  TODO: Implement this once queue limits are in-place. +    zmq_assert (outpipe->check_write (zmq_msg_size (msg_))); + +    outpipe->write (msg_); +    if (!(flags_ & ZMQ_NOFLUSH)) +        outpipe->flush ();      return 0;  }  int zmq::p2p_t::xflush ()  { -    zmq_assert (false); +    if (outpipe) +        outpipe->flush ();      return 0;  }  int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_)  { -    zmq_assert (false); +    //  Deallocate old content of the message. +    zmq_msg_close (msg_); + +    if (!alive || !inpipe || !inpipe->read (msg_)) { +        errno = EAGAIN; +        return -1; +    }      return 0;  }  bool zmq::p2p_t::xhas_in ()  { -    zmq_assert (false); +    if (alive && inpipe && inpipe->check_read ()) +        return true;      return false;  }  bool zmq::p2p_t::xhas_out ()  { -    zmq_assert (false); -    return false; +    //  TODO: Implement this once queue limits are in-place. +    return true;  } diff --git a/src/p2p.hpp b/src/p2p.hpp index 5bbe111..2ff1047 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -47,6 +47,11 @@ namespace zmq      private: +        class reader_t *inpipe; +        class writer_t *outpipe; + +        bool alive; +          p2p_t (const p2p_t&);          void operator = (const p2p_t&);      };  | 
