diff options
Diffstat (limited to 'src/lb.cpp')
-rw-r--r-- | src/lb.cpp | 49 |
1 files changed, 22 insertions, 27 deletions
@@ -54,15 +54,6 @@ void zmq::lb_t::detach (writer_t *pipe_) pipes.erase (pipe_); } -void zmq::lb_t::kill (writer_t *pipe_) -{ - // Move the pipe to the list of inactive pipes. - active--; - if (current == active) - current = 0; - pipes.swap (pipes.index (pipe_), active); -} - void zmq::lb_t::revive (writer_t *pipe_) { // Move the pipe to the list of active pipes. @@ -72,42 +63,46 @@ void zmq::lb_t::revive (writer_t *pipe_) int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) { + while (active > 0) { + if (pipes [current]->write (msg_)) + break; + + active--; + if (current < active) + pipes.swap (current, active); + else + current = 0; + } + // If there are no pipes we cannot send the message. - if (pipes.empty ()) { + if (active == 0) { errno = EAGAIN; return -1; } - // TODO: Implement this once queue limits are in-place. - zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); - - // Push message to the selected pipe. - pipes [current]->write (msg_); - pipes [current]->flush (); + if (!(flags_ & ZMQ_NOFLUSH)) + pipes [current]->flush (); // Detach the message from the data buffer. int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); // Move to the next pipe (load-balancing). - current++; - if (current >= active) - current = 0; + current = (current + 1) % active; return 0; } bool zmq::lb_t::has_out () { - for (int count = active; count != 0; count--) { - - // We should be able to write at least 1-byte message to interrupt - // polling for POLLOUT. - // TODO: Shouldn't we use a saner value here? - if (pipes [current]->check_write (1)) + while (active > 0) { + if (pipes [current]->check_write ()) return true; - current++; - if (current >= active) + + active--; + if (current < active) + pipes.swap (current, active); + else current = 0; } |