summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Hurton <hurtonm@gmail.com>2010-03-01 16:55:13 +0100
committerMartin Hurton <hurtonm@gmail.com>2010-03-12 11:07:38 +0100
commit5d4f6b18cd57897cc0e77e474118e104a0d5cfc3 (patch)
treeab6ab5009ef679483bec83ac8ecead0d91ba0c94 /src
parentf9521c6b6a35103c03b742a311a34d7b04da0b84 (diff)
Implement flow control for ZMQ_P2P sockets
Diffstat (limited to 'src')
-rw-r--r--src/p2p.cpp21
-rw-r--r--src/p2p.hpp1
2 files changed, 16 insertions, 6 deletions
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 728854b..f81c6c4 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -47,6 +47,7 @@ void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_;
outpipe = outpipe_;
+ outpipe_alive = true;
}
void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_)
@@ -75,7 +76,8 @@ void zmq::p2p_t::xrevive (class reader_t *pipe_)
void zmq::p2p_t::xrevive (class writer_t *pipe_)
{
- zmq_not_implemented ();
+ zmq_assert (!outpipe_alive);
+ outpipe_alive = true;
}
int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,
@@ -87,13 +89,17 @@ int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,
int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)
{
- if (!outpipe) {
+ if (outpipe == NULL || !outpipe_alive) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ if (!outpipe->write (msg_)) {
+ outpipe_alive = false;
errno = EAGAIN;
return -1;
}
- bool written = outpipe->write (msg_);
- zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
outpipe->flush ();
@@ -132,7 +138,10 @@ bool zmq::p2p_t::xhas_in ()
bool zmq::p2p_t::xhas_out ()
{
- // TODO: Implement this once queue limits are in-place.
- return true;
+ if (outpipe == NULL || !outpipe_alive)
+ return false;
+
+ outpipe_alive = outpipe->check_write ();
+ return outpipe_alive;
}
diff --git a/src/p2p.hpp b/src/p2p.hpp
index e12b58c..97531cf 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -53,6 +53,7 @@ namespace zmq
class writer_t *outpipe;
bool alive;
+ bool outpipe_alive;
p2p_t (const p2p_t&);
void operator = (const p2p_t&);