summaryrefslogtreecommitdiff
path: root/src/lb.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/lb.cpp')
-rw-r--r--src/lb.cpp49
1 files changed, 22 insertions, 27 deletions
diff --git a/src/lb.cpp b/src/lb.cpp
index 4743ac6..d7193f1 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -54,15 +54,6 @@ void zmq::lb_t::detach (writer_t *pipe_)
pipes.erase (pipe_);
}
-void zmq::lb_t::kill (writer_t *pipe_)
-{
- // Move the pipe to the list of inactive pipes.
- active--;
- if (current == active)
- current = 0;
- pipes.swap (pipes.index (pipe_), active);
-}
-
void zmq::lb_t::revive (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
@@ -72,42 +63,46 @@ void zmq::lb_t::revive (writer_t *pipe_)
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{
+ while (active > 0) {
+ if (pipes [current]->write (msg_))
+ break;
+
+ active--;
+ if (current < active)
+ pipes.swap (current, active);
+ else
+ current = 0;
+ }
+
// If there are no pipes we cannot send the message.
- if (pipes.empty ()) {
+ if (active == 0) {
errno = EAGAIN;
return -1;
}
- // TODO: Implement this once queue limits are in-place.
- zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
-
- // Push message to the selected pipe.
- pipes [current]->write (msg_);
- pipes [current]->flush ();
+ if (!(flags_ & ZMQ_NOFLUSH))
+ pipes [current]->flush ();
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
// Move to the next pipe (load-balancing).
- current++;
- if (current >= active)
- current = 0;
+ current = (current + 1) % active;
return 0;
}
bool zmq::lb_t::has_out ()
{
- for (int count = active; count != 0; count--) {
-
- // We should be able to write at least 1-byte message to interrupt
- // polling for POLLOUT.
- // TODO: Shouldn't we use a saner value here?
- if (pipes [current]->check_write (1))
+ while (active > 0) {
+ if (pipes [current]->check_write ())
return true;
- current++;
- if (current >= active)
+
+ active--;
+ if (current < active)
+ pipes.swap (current, active);
+ else
current = 0;
}