From 06d7a447378c8e9f0805c219deaf8e7e7ef1eeb0 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Tue, 2 Mar 2010 10:48:30 +0100 Subject: Implement flow control for ZMQ_PUB sockets --- src/pub.cpp | 34 ++++++++++++++++++++++++++-------- src/pub.hpp | 7 +++++++ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/pub.cpp b/src/pub.cpp index b6802fd..2971e5c 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -25,7 +25,8 @@ #include "pipe.hpp" zmq::pub_t::pub_t (class app_thread_t *parent_) : - socket_base_t (parent_) + socket_base_t (parent_), + stalled_pipe (NULL) { options.requires_in = false; options.requires_out = true; @@ -53,6 +54,8 @@ void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_) void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_) { out_pipes.erase (pipe_); + if (pipe_ == stalled_pipe) + stalled_pipe = NULL; } void zmq::pub_t::xkill (class reader_t *pipe_) @@ -67,7 +70,8 @@ void zmq::pub_t::xrevive (class reader_t *pipe_) void zmq::pub_t::xrevive (class writer_t *pipe_) { - zmq_not_implemented (); + zmq_assert (stalled_pipe = pipe_); + stalled_pipe = NULL; } int zmq::pub_t::xsetsockopt (int option_, const void *optval_, @@ -91,11 +95,10 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) } // First check whether all pipes are available for writing. - for (out_pipes_t::size_type i = 0; i != pipes_count; i++) - if (!out_pipes [i]->check_write ()) { - errno = EAGAIN; - return -1; - } + if (!check_write ()) { + errno = EAGAIN; + return -1; + } msg_content_t *content = (msg_content_t*) msg_->content; @@ -171,7 +174,22 @@ bool zmq::pub_t::xhas_in () bool zmq::pub_t::xhas_out () { - // TODO: Reimplement when queue limits are added. + return check_write (); +} + +bool zmq::pub_t::check_write () +{ + if (stalled_pipe != NULL) + return false; + + out_pipes_t::size_type pipes_num = out_pipes.size (); + for (out_pipes_t::size_type i = 0; i < pipes_num; i++) { + if (!out_pipes [i]->check_write ()) { + stalled_pipe = out_pipes [i]; + return false; + } + } + return true; } diff --git a/src/pub.hpp b/src/pub.hpp index a85301f..3a4fe09 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -54,6 +54,13 @@ namespace zmq typedef yarray_t out_pipes_t; out_pipes_t out_pipes; + // Pointer to the pipe we are waiting for to became writable + // again; NULL if tha last send operation was successful. + class writer_t *stalled_pipe; + + // Check whether we can write a message to all pipes. + bool check_write (); + pub_t (const pub_t&); void operator = (const pub_t&); }; -- cgit v1.2.3