From 70ea8e9d4b88a1ecb1c076eccc2e9bd872c2230c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 30 Jan 2010 13:40:50 +0100 Subject: ZMQII-48: Implement P2P socket type --- src/p2p.cpp | 56 ++++++++++++++++++++++++++++++++++++++++++++------------ src/p2p.hpp | 5 +++++ 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/src/p2p.cpp b/src/p2p.cpp index ae2424a..9a5e186 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -21,9 +21,13 @@ #include "p2p.hpp" #include "err.hpp" +#include "pipe.hpp" zmq::p2p_t::p2p_t (class app_thread_t *parent_) : - socket_base_t (parent_) + socket_base_t (parent_), + inpipe (NULL), + outpipe (NULL), + alive (true) { options.requires_in = true; options.requires_out = true; @@ -31,32 +35,42 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) : zmq::p2p_t::~p2p_t () { + if (inpipe) + inpipe->term (); + if (outpipe) + outpipe->term (); } void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { - zmq_assert (false); + zmq_assert (!inpipe && !outpipe); + inpipe = inpipe_; + outpipe = outpipe_; } void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_) { - zmq_assert (false); + zmq_assert (pipe_ == inpipe); + inpipe = NULL; } void zmq::p2p_t::xdetach_outpipe (class writer_t *pipe_) { - zmq_assert (false); + zmq_assert (pipe_ == outpipe); + outpipe = NULL; } void zmq::p2p_t::xkill (class reader_t *pipe_) { - zmq_assert (false); + zmq_assert (alive); + alive = false; } void zmq::p2p_t::xrevive (class reader_t *pipe_) { - zmq_assert (false); + zmq_assert (!alive); + alive = true; } int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, @@ -68,31 +82,49 @@ int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_) { - zmq_assert (false); + if (!outpipe) { + errno = EAGAIN; + return -1; + } + + // TODO: Implement this once queue limits are in-place. + zmq_assert (outpipe->check_write (zmq_msg_size (msg_))); + + outpipe->write (msg_); + if (!(flags_ & ZMQ_NOFLUSH)) + outpipe->flush (); return 0; } int zmq::p2p_t::xflush () { - zmq_assert (false); + if (outpipe) + outpipe->flush (); return 0; } int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_) { - zmq_assert (false); + // Deallocate old content of the message. + zmq_msg_close (msg_); + + if (!alive || !inpipe || !inpipe->read (msg_)) { + errno = EAGAIN; + return -1; + } return 0; } bool zmq::p2p_t::xhas_in () { - zmq_assert (false); + if (alive && inpipe && inpipe->check_read ()) + return true; return false; } bool zmq::p2p_t::xhas_out () { - zmq_assert (false); - return false; + // TODO: Implement this once queue limits are in-place. + return true; } diff --git a/src/p2p.hpp b/src/p2p.hpp index 5bbe111..2ff1047 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -47,6 +47,11 @@ namespace zmq private: + class reader_t *inpipe; + class writer_t *outpipe; + + bool alive; + p2p_t (const p2p_t&); void operator = (const p2p_t&); }; -- cgit v1.2.3