diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/p2p.cpp | 21 | ||||
-rw-r--r-- | src/p2p.hpp | 1 |
2 files changed, 16 insertions, 6 deletions
diff --git a/src/p2p.cpp b/src/p2p.cpp index 728854b..f81c6c4 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -47,6 +47,7 @@ void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, zmq_assert (!inpipe && !outpipe); inpipe = inpipe_; outpipe = outpipe_; + outpipe_alive = true; } void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_) @@ -75,7 +76,8 @@ void zmq::p2p_t::xrevive (class reader_t *pipe_) void zmq::p2p_t::xrevive (class writer_t *pipe_) { - zmq_not_implemented (); + zmq_assert (!outpipe_alive); + outpipe_alive = true; } int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, @@ -87,13 +89,17 @@ int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_) { - if (!outpipe) { + if (outpipe == NULL || !outpipe_alive) { + errno = EAGAIN; + return -1; + } + + if (!outpipe->write (msg_)) { + outpipe_alive = false; errno = EAGAIN; return -1; } - bool written = outpipe->write (msg_); - zmq_assert (written); if (!(flags_ & ZMQ_NOFLUSH)) outpipe->flush (); @@ -132,7 +138,10 @@ bool zmq::p2p_t::xhas_in () bool zmq::p2p_t::xhas_out () { - // TODO: Implement this once queue limits are in-place. - return true; + if (outpipe == NULL || !outpipe_alive) + return false; + + outpipe_alive = outpipe->check_write (); + return outpipe_alive; } diff --git a/src/p2p.hpp b/src/p2p.hpp index e12b58c..97531cf 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -53,6 +53,7 @@ namespace zmq class writer_t *outpipe; bool alive; + bool outpipe_alive; p2p_t (const p2p_t&); void operator = (const p2p_t&); |