summaryrefslogtreecommitdiff
path: root/src/downstream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/downstream.cpp')
-rw-r--r--src/downstream.cpp34
1 files changed, 5 insertions, 29 deletions
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 4f994e6..be1c4cc 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -24,8 +24,7 @@
#include "pipe.hpp"
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
- current (0)
+ socket_base_t (parent_)
{
options.requires_in = false;
options.requires_out = true;
@@ -39,7 +38,7 @@ void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (!inpipe_ && outpipe_);
- pipes.push_back (outpipe_);
+ lb.attach (outpipe_);
}
void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
@@ -51,7 +50,7 @@ void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (pipe_);
- pipes.erase (pipes.index (pipe_));
+ lb.detach (pipe_);
}
void zmq::downstream_t::xkill (class reader_t *pipe_)
@@ -76,29 +75,7 @@ int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
{
- // If there are no pipes we cannot send the message.
- if (pipes.empty ()) {
- errno = EAGAIN;
- return -1;
- }
-
- // Move to the next pipe (load-balancing).
- current++;
- if (current >= pipes.size ())
- current = 0;
-
- // TODO: Implement this once queue limits are in-place.
- zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
-
- // Push message to the selected pipe.
- pipes [current]->write (msg_);
- pipes [current]->flush ();
-
- // Detach the message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
-
- return 0;
+ return lb.send (msg_, flags_);
}
int zmq::downstream_t::xflush ()
@@ -124,8 +101,7 @@ bool zmq::downstream_t::xhas_in ()
bool zmq::downstream_t::xhas_out ()
{
- // TODO: Modify this code once pipe limits are in place.
- return true;
+ return lb.has_out ();
}