diff options
| -rw-r--r-- | src/fq.cpp | 7 | ||||
| -rw-r--r-- | src/lb.cpp | 17 | 
2 files changed, 17 insertions, 7 deletions
@@ -46,8 +46,11 @@ void zmq::fq_t::detach (reader_t *pipe_)  {      //  Remove the pipe from the list; adjust number of active pipes      //  accordingly. -    if (pipes.index (pipe_) < active) +    if (pipes.index (pipe_) < active) {          active--; +        if (current == active) +            current = 0; +    }      pipes.erase (pipe_);  } @@ -55,6 +58,8 @@ void zmq::fq_t::kill (reader_t *pipe_)  {      //  Move the pipe to the list of inactive pipes.      active--; +    if (current == active) +        current = 0;      pipes.swap (pipes.index (pipe_), active);  } @@ -46,8 +46,11 @@ void zmq::lb_t::detach (writer_t *pipe_)  {      //  Remove the pipe from the list; adjust number of active pipes      //  accordingly. -    if (pipes.index (pipe_) < active) +    if (pipes.index (pipe_) < active) {          active--; +        if (current == active) +            current = 0; +    }      pipes.erase (pipe_);  } @@ -55,6 +58,8 @@ void zmq::lb_t::kill (writer_t *pipe_)  {      //  Move the pipe to the list of inactive pipes.      active--; +    if (current == active) +        current = 0;      pipes.swap (pipes.index (pipe_), active);  } @@ -73,11 +78,6 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)          return -1;      } -    //  Move to the next pipe (load-balancing). -    current++; -    if (current >= active) -        current = 0; -      //  TODO: Implement this once queue limits are in-place.      zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); @@ -89,6 +89,11 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)      int rc = zmq_msg_init (msg_);      zmq_assert (rc == 0); +    //  Move to the next pipe (load-balancing). +    current++; +    if (current >= active) +        current = 0; +      return 0;  }  | 
