diff options
| author | Martin Hurton <hurtonm@gmail.com> | 2010-03-02 22:23:34 +0100 | 
|---|---|---|
| committer | Martin Hurton <hurtonm@gmail.com> | 2010-03-12 11:07:39 +0100 | 
| commit | 923609b0922c3bf07f16c8c99aba4fe98f08ef60 (patch) | |
| tree | df52d53536c6ed2ea8487f03137cf6bc66320444 /src/req.cpp | |
| parent | 42e575cb6b62fe1e5d12d2e4fb5c6874d47eb57e (diff) | |
Implement flow control for ZMQ_REQ sockets
Diffstat (limited to 'src/req.cpp')
| -rw-r--r-- | src/req.cpp | 98 | 
1 files changed, 77 insertions, 21 deletions
| diff --git a/src/req.cpp b/src/req.cpp index f21613a..5b09589 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -25,6 +25,7 @@  zmq::req_t::req_t (class app_thread_t *parent_) :      socket_base_t (parent_), +    active (0),      current (0),      waiting_for_reply (false),      reply_pipe_active (false), @@ -45,7 +46,12 @@ void zmq::req_t::xattach_pipes (class reader_t *inpipe_,      zmq_assert (in_pipes.size () == out_pipes.size ());      in_pipes.push_back (inpipe_); +    in_pipes.swap (active, in_pipes.size () - 1); +      out_pipes.push_back (outpipe_); +    out_pipes.swap (active, out_pipes.size () - 1); + +    active++;  }  void zmq::req_t::xdetach_inpipe (class reader_t *pipe_) @@ -61,16 +67,28 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)      in_pipes_t::size_type index = in_pipes.index (pipe_); -    //  If corresponding outpipe is still in place simply nullify the pointer -    //  to the inpipe. +    //  If the corresponding outpipe is still in place nullify the pointer +    //  to the inpipe ane move both pipes into inactive zone.      if (out_pipes [index]) {          in_pipes [index] = NULL; +        if (index < active) { +            active--; +            in_pipes.swap (index, active); +            out_pipes.swap (index, active); +            if (current = active) +                current = 0; +        }          return;      }      //  Now both inpipe and outpipe are detached. Remove them from the lists.      in_pipes.erase (index);      out_pipes.erase (index); +    if (index < active) { +        active--; +        if (current == active) +            current = 0; +    }  }  void zmq::req_t::xdetach_outpipe (class writer_t *pipe_) @@ -80,20 +98,33 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)      out_pipes_t::size_type index = out_pipes.index (pipe_); -    //  If corresponding inpipe is still in place simply nullify the pointer -    //  to the outpipe. +    //  If the corresponding inpipe is still in place nullify the pointer +    //  to the outpipe and move both pipes into inactive zone.      if (in_pipes [index]) {          out_pipes [index] = NULL; +        if (index < active) { +            active--; +            in_pipes.swap (index, active); +            out_pipes.swap (index, active); +            if (current == active) +                current = 0; +        }          return;      }      //  Now both inpipe and outpipe are detached. Remove them from the lists.      in_pipes.erase (index);      out_pipes.erase (index); +    if (index < active) { +        active--; +        if (current == active) +            current = 0; +    }  }  void zmq::req_t::xkill (class reader_t *pipe_)  { +    zmq_assert (waiting_for_reply);      zmq_assert (pipe_ == reply_pipe);      reply_pipe_active = false; @@ -104,13 +135,19 @@ void zmq::req_t::xrevive (class reader_t *pipe_)      //  TODO: Actually, misbehaving peer can cause this kind of thing.      //  Handle it decently, presumably kill the offending connection.      zmq_assert (pipe_ == reply_pipe); -      reply_pipe_active = true;  }  void zmq::req_t::xrevive (class writer_t *pipe_)  { -    zmq_not_implemented (); +    out_pipes_t::size_type index = out_pipes.index (pipe_); +    zmq_assert (index >= active); + +    if (in_pipes [index] != NULL) { +        in_pipes.swap (index, active); +        out_pipes.swap (index, active); +        active++; +    }  }  int zmq::req_t::xsetsockopt (int option_, const void *optval_, @@ -129,24 +166,24 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)          return -1;      } -    if (out_pipes.empty ()) { -        errno = EAGAIN; -        return -1; -    } +    while (active > 0) { +        if (out_pipes [current]->check_write ()) +            break; -    current++; -    if (current >= out_pipes.size ()) -        current = 0; - -    //  TODO: Infinite loop can result here. Integrate the algorithm with -    //  the active pipes list (i.e. pipe pair that has one pipe missing is -    //  considered to be inactive. -    while (!in_pipes [current] || !out_pipes [current]) { -        current++; -        if (current >= out_pipes.size ()) +        active--; +        if (current < active) { +            in_pipes.swap (current, active); +            out_pipes.swap (current, active); +        } +        else              current = 0;      } +    if (active == 0) { +        errno = EAGAIN; +        return -1; +    } +      //  Push message to the selected pipe.      bool written = out_pipes [current]->write (msg_);      zmq_assert (written); @@ -164,6 +201,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)      int rc = zmq_msg_init (msg_);      zmq_assert (rc == 0); +    //  Move to the next pipe (load-balancing). +    current = (current + 1) % active; +      return 0;  } @@ -213,7 +253,23 @@ bool zmq::req_t::xhas_in ()  bool zmq::req_t::xhas_out ()  { -    return !waiting_for_reply; +    if (waiting_for_reply) +        return false; + +    while (active > 0) { +        if (out_pipes [current]->check_write ()) +            return true;; + +        active--; +        if (current < active) { +            in_pipes.swap (current, active); +            out_pipes.swap (current, active); +        } +        else +            current = 0; +    } + +    return false;  } | 
