diff options
-rw-r--r-- | src/fq.cpp | 7 | ||||
-rw-r--r-- | src/lb.cpp | 17 |
2 files changed, 17 insertions, 7 deletions
@@ -46,8 +46,11 @@ void zmq::fq_t::detach (reader_t *pipe_) { // Remove the pipe from the list; adjust number of active pipes // accordingly. - if (pipes.index (pipe_) < active) + if (pipes.index (pipe_) < active) { active--; + if (current == active) + current = 0; + } pipes.erase (pipe_); } @@ -55,6 +58,8 @@ void zmq::fq_t::kill (reader_t *pipe_) { // Move the pipe to the list of inactive pipes. active--; + if (current == active) + current = 0; pipes.swap (pipes.index (pipe_), active); } @@ -46,8 +46,11 @@ void zmq::lb_t::detach (writer_t *pipe_) { // Remove the pipe from the list; adjust number of active pipes // accordingly. - if (pipes.index (pipe_) < active) + if (pipes.index (pipe_) < active) { active--; + if (current == active) + current = 0; + } pipes.erase (pipe_); } @@ -55,6 +58,8 @@ void zmq::lb_t::kill (writer_t *pipe_) { // Move the pipe to the list of inactive pipes. active--; + if (current == active) + current = 0; pipes.swap (pipes.index (pipe_), active); } @@ -73,11 +78,6 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) return -1; } - // Move to the next pipe (load-balancing). - current++; - if (current >= active) - current = 0; - // TODO: Implement this once queue limits are in-place. zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); @@ -89,6 +89,11 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); + // Move to the next pipe (load-balancing). + current++; + if (current >= active) + current = 0; + return 0; } |