diff options
author | Martin Hurton <hurtonm@gmail.com> | 2010-03-02 22:23:34 +0100 |
---|---|---|
committer | Martin Hurton <hurtonm@gmail.com> | 2010-03-12 11:07:39 +0100 |
commit | 923609b0922c3bf07f16c8c99aba4fe98f08ef60 (patch) | |
tree | df52d53536c6ed2ea8487f03137cf6bc66320444 /src | |
parent | 42e575cb6b62fe1e5d12d2e4fb5c6874d47eb57e (diff) |
Implement flow control for ZMQ_REQ sockets
Diffstat (limited to 'src')
-rw-r--r-- | src/req.cpp | 98 | ||||
-rw-r--r-- | src/req.hpp | 3 |
2 files changed, 80 insertions, 21 deletions
diff --git a/src/req.cpp b/src/req.cpp index f21613a..5b09589 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -25,6 +25,7 @@ zmq::req_t::req_t (class app_thread_t *parent_) : socket_base_t (parent_), + active (0), current (0), waiting_for_reply (false), reply_pipe_active (false), @@ -45,7 +46,12 @@ void zmq::req_t::xattach_pipes (class reader_t *inpipe_, zmq_assert (in_pipes.size () == out_pipes.size ()); in_pipes.push_back (inpipe_); + in_pipes.swap (active, in_pipes.size () - 1); + out_pipes.push_back (outpipe_); + out_pipes.swap (active, out_pipes.size () - 1); + + active++; } void zmq::req_t::xdetach_inpipe (class reader_t *pipe_) @@ -61,16 +67,28 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_) in_pipes_t::size_type index = in_pipes.index (pipe_); - // If corresponding outpipe is still in place simply nullify the pointer - // to the inpipe. + // If the corresponding outpipe is still in place nullify the pointer + // to the inpipe ane move both pipes into inactive zone. if (out_pipes [index]) { in_pipes [index] = NULL; + if (index < active) { + active--; + in_pipes.swap (index, active); + out_pipes.swap (index, active); + if (current = active) + current = 0; + } return; } // Now both inpipe and outpipe are detached. Remove them from the lists. in_pipes.erase (index); out_pipes.erase (index); + if (index < active) { + active--; + if (current == active) + current = 0; + } } void zmq::req_t::xdetach_outpipe (class writer_t *pipe_) @@ -80,20 +98,33 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_) out_pipes_t::size_type index = out_pipes.index (pipe_); - // If corresponding inpipe is still in place simply nullify the pointer - // to the outpipe. + // If the corresponding inpipe is still in place nullify the pointer + // to the outpipe and move both pipes into inactive zone. if (in_pipes [index]) { out_pipes [index] = NULL; + if (index < active) { + active--; + in_pipes.swap (index, active); + out_pipes.swap (index, active); + if (current == active) + current = 0; + } return; } // Now both inpipe and outpipe are detached. Remove them from the lists. in_pipes.erase (index); out_pipes.erase (index); + if (index < active) { + active--; + if (current == active) + current = 0; + } } void zmq::req_t::xkill (class reader_t *pipe_) { + zmq_assert (waiting_for_reply); zmq_assert (pipe_ == reply_pipe); reply_pipe_active = false; @@ -104,13 +135,19 @@ void zmq::req_t::xrevive (class reader_t *pipe_) // TODO: Actually, misbehaving peer can cause this kind of thing. // Handle it decently, presumably kill the offending connection. zmq_assert (pipe_ == reply_pipe); - reply_pipe_active = true; } void zmq::req_t::xrevive (class writer_t *pipe_) { - zmq_not_implemented (); + out_pipes_t::size_type index = out_pipes.index (pipe_); + zmq_assert (index >= active); + + if (in_pipes [index] != NULL) { + in_pipes.swap (index, active); + out_pipes.swap (index, active); + active++; + } } int zmq::req_t::xsetsockopt (int option_, const void *optval_, @@ -129,24 +166,24 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) return -1; } - if (out_pipes.empty ()) { - errno = EAGAIN; - return -1; - } + while (active > 0) { + if (out_pipes [current]->check_write ()) + break; - current++; - if (current >= out_pipes.size ()) - current = 0; - - // TODO: Infinite loop can result here. Integrate the algorithm with - // the active pipes list (i.e. pipe pair that has one pipe missing is - // considered to be inactive. - while (!in_pipes [current] || !out_pipes [current]) { - current++; - if (current >= out_pipes.size ()) + active--; + if (current < active) { + in_pipes.swap (current, active); + out_pipes.swap (current, active); + } + else current = 0; } + if (active == 0) { + errno = EAGAIN; + return -1; + } + // Push message to the selected pipe. bool written = out_pipes [current]->write (msg_); zmq_assert (written); @@ -164,6 +201,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); + // Move to the next pipe (load-balancing). + current = (current + 1) % active; + return 0; } @@ -213,7 +253,23 @@ bool zmq::req_t::xhas_in () bool zmq::req_t::xhas_out () { - return !waiting_for_reply; + if (waiting_for_reply) + return false; + + while (active > 0) { + if (out_pipes [current]->check_write ()) + return true;; + + active--; + if (current < active) { + in_pipes.swap (current, active); + out_pipes.swap (current, active); + } + else + current = 0; + } + + return false; } diff --git a/src/req.hpp b/src/req.hpp index 4058b08..531c06f 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -64,6 +64,9 @@ namespace zmq typedef yarray_t <class reader_t> in_pipes_t; in_pipes_t in_pipes; + // Number of active pipes. + size_t active; + // Req_t load-balances the requests - 'current' points to the session // that's processing the request at the moment. out_pipes_t::size_type current; |