summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Hurton <hurtonm@gmail.com>2010-03-02 10:48:30 +0100
committerMartin Hurton <hurtonm@gmail.com>2010-03-12 11:07:38 +0100
commit06d7a447378c8e9f0805c219deaf8e7e7ef1eeb0 (patch)
tree5d06fda5d5cdb30b81c3f5712c1b07593ff0e645 /src
parentf9c84a1a689f4f64cfa45cb22d4f02ec246c7f93 (diff)
Implement flow control for ZMQ_PUB sockets
Diffstat (limited to 'src')
-rw-r--r--src/pub.cpp34
-rw-r--r--src/pub.hpp7
2 files changed, 33 insertions, 8 deletions
diff --git a/src/pub.cpp b/src/pub.cpp
index b6802fd..2971e5c 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -25,7 +25,8 @@
#include "pipe.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) :
- socket_base_t (parent_)
+ socket_base_t (parent_),
+ stalled_pipe (NULL)
{
options.requires_in = false;
options.requires_out = true;
@@ -53,6 +54,8 @@ void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)
{
out_pipes.erase (pipe_);
+ if (pipe_ == stalled_pipe)
+ stalled_pipe = NULL;
}
void zmq::pub_t::xkill (class reader_t *pipe_)
@@ -67,7 +70,8 @@ void zmq::pub_t::xrevive (class reader_t *pipe_)
void zmq::pub_t::xrevive (class writer_t *pipe_)
{
- zmq_not_implemented ();
+ zmq_assert (stalled_pipe = pipe_);
+ stalled_pipe = NULL;
}
int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
@@ -91,11 +95,10 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
}
// First check whether all pipes are available for writing.
- for (out_pipes_t::size_type i = 0; i != pipes_count; i++)
- if (!out_pipes [i]->check_write ()) {
- errno = EAGAIN;
- return -1;
- }
+ if (!check_write ()) {
+ errno = EAGAIN;
+ return -1;
+ }
msg_content_t *content = (msg_content_t*) msg_->content;
@@ -171,7 +174,22 @@ bool zmq::pub_t::xhas_in ()
bool zmq::pub_t::xhas_out ()
{
- // TODO: Reimplement when queue limits are added.
+ return check_write ();
+}
+
+bool zmq::pub_t::check_write ()
+{
+ if (stalled_pipe != NULL)
+ return false;
+
+ out_pipes_t::size_type pipes_num = out_pipes.size ();
+ for (out_pipes_t::size_type i = 0; i < pipes_num; i++) {
+ if (!out_pipes [i]->check_write ()) {
+ stalled_pipe = out_pipes [i];
+ return false;
+ }
+ }
+
return true;
}
diff --git a/src/pub.hpp b/src/pub.hpp
index a85301f..3a4fe09 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -54,6 +54,13 @@ namespace zmq
typedef yarray_t <class writer_t> out_pipes_t;
out_pipes_t out_pipes;
+ // Pointer to the pipe we are waiting for to became writable
+ // again; NULL if tha last send operation was successful.
+ class writer_t *stalled_pipe;
+
+ // Check whether we can write a message to all pipes.
+ bool check_write ();
+
pub_t (const pub_t&);
void operator = (const pub_t&);
};