summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Hurton <hurtonm@gmail.com>2010-03-02 22:23:34 +0100
committerMartin Hurton <hurtonm@gmail.com>2010-03-12 11:07:39 +0100
commit923609b0922c3bf07f16c8c99aba4fe98f08ef60 (patch)
treedf52d53536c6ed2ea8487f03137cf6bc66320444 /src
parent42e575cb6b62fe1e5d12d2e4fb5c6874d47eb57e (diff)
Implement flow control for ZMQ_REQ sockets
Diffstat (limited to 'src')
-rw-r--r--src/req.cpp98
-rw-r--r--src/req.hpp3
2 files changed, 80 insertions, 21 deletions
diff --git a/src/req.cpp b/src/req.cpp
index f21613a..5b09589 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -25,6 +25,7 @@
zmq::req_t::req_t (class app_thread_t *parent_) :
socket_base_t (parent_),
+ active (0),
current (0),
waiting_for_reply (false),
reply_pipe_active (false),
@@ -45,7 +46,12 @@ void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
zmq_assert (in_pipes.size () == out_pipes.size ());
in_pipes.push_back (inpipe_);
+ in_pipes.swap (active, in_pipes.size () - 1);
+
out_pipes.push_back (outpipe_);
+ out_pipes.swap (active, out_pipes.size () - 1);
+
+ active++;
}
void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
@@ -61,16 +67,28 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
in_pipes_t::size_type index = in_pipes.index (pipe_);
- // If corresponding outpipe is still in place simply nullify the pointer
- // to the inpipe.
+ // If the corresponding outpipe is still in place nullify the pointer
+ // to the inpipe ane move both pipes into inactive zone.
if (out_pipes [index]) {
in_pipes [index] = NULL;
+ if (index < active) {
+ active--;
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+ if (current = active)
+ current = 0;
+ }
return;
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
in_pipes.erase (index);
out_pipes.erase (index);
+ if (index < active) {
+ active--;
+ if (current == active)
+ current = 0;
+ }
}
void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
@@ -80,20 +98,33 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
out_pipes_t::size_type index = out_pipes.index (pipe_);
- // If corresponding inpipe is still in place simply nullify the pointer
- // to the outpipe.
+ // If the corresponding inpipe is still in place nullify the pointer
+ // to the outpipe and move both pipes into inactive zone.
if (in_pipes [index]) {
out_pipes [index] = NULL;
+ if (index < active) {
+ active--;
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+ if (current == active)
+ current = 0;
+ }
return;
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
in_pipes.erase (index);
out_pipes.erase (index);
+ if (index < active) {
+ active--;
+ if (current == active)
+ current = 0;
+ }
}
void zmq::req_t::xkill (class reader_t *pipe_)
{
+ zmq_assert (waiting_for_reply);
zmq_assert (pipe_ == reply_pipe);
reply_pipe_active = false;
@@ -104,13 +135,19 @@ void zmq::req_t::xrevive (class reader_t *pipe_)
// TODO: Actually, misbehaving peer can cause this kind of thing.
// Handle it decently, presumably kill the offending connection.
zmq_assert (pipe_ == reply_pipe);
-
reply_pipe_active = true;
}
void zmq::req_t::xrevive (class writer_t *pipe_)
{
- zmq_not_implemented ();
+ out_pipes_t::size_type index = out_pipes.index (pipe_);
+ zmq_assert (index >= active);
+
+ if (in_pipes [index] != NULL) {
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+ active++;
+ }
}
int zmq::req_t::xsetsockopt (int option_, const void *optval_,
@@ -129,24 +166,24 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
- if (out_pipes.empty ()) {
- errno = EAGAIN;
- return -1;
- }
+ while (active > 0) {
+ if (out_pipes [current]->check_write ())
+ break;
- current++;
- if (current >= out_pipes.size ())
- current = 0;
-
- // TODO: Infinite loop can result here. Integrate the algorithm with
- // the active pipes list (i.e. pipe pair that has one pipe missing is
- // considered to be inactive.
- while (!in_pipes [current] || !out_pipes [current]) {
- current++;
- if (current >= out_pipes.size ())
+ active--;
+ if (current < active) {
+ in_pipes.swap (current, active);
+ out_pipes.swap (current, active);
+ }
+ else
current = 0;
}
+ if (active == 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+
// Push message to the selected pipe.
bool written = out_pipes [current]->write (msg_);
zmq_assert (written);
@@ -164,6 +201,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
+ // Move to the next pipe (load-balancing).
+ current = (current + 1) % active;
+
return 0;
}
@@ -213,7 +253,23 @@ bool zmq::req_t::xhas_in ()
bool zmq::req_t::xhas_out ()
{
- return !waiting_for_reply;
+ if (waiting_for_reply)
+ return false;
+
+ while (active > 0) {
+ if (out_pipes [current]->check_write ())
+ return true;;
+
+ active--;
+ if (current < active) {
+ in_pipes.swap (current, active);
+ out_pipes.swap (current, active);
+ }
+ else
+ current = 0;
+ }
+
+ return false;
}
diff --git a/src/req.hpp b/src/req.hpp
index 4058b08..531c06f 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -64,6 +64,9 @@ namespace zmq
typedef yarray_t <class reader_t> in_pipes_t;
in_pipes_t in_pipes;
+ // Number of active pipes.
+ size_t active;
+
// Req_t load-balances the requests - 'current' points to the session
// that's processing the request at the moment.
out_pipes_t::size_type current;