summaryrefslogtreecommitdiff
path: root/src/upstream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/upstream.cpp')
-rw-r--r--src/upstream.cpp58
1 files changed, 7 insertions, 51 deletions
diff --git a/src/upstream.cpp b/src/upstream.cpp
index da202f8..32de63a 100644
--- a/src/upstream.cpp
+++ b/src/upstream.cpp
@@ -21,12 +21,9 @@
#include "upstream.hpp"
#include "err.hpp"
-#include "pipe.hpp"
zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
- active (0),
- current (0)
+ socket_base_t (parent_)
{
options.requires_in = true;
options.requires_out = false;
@@ -40,21 +37,13 @@ void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (inpipe_ && !outpipe_);
-
- pipes.push_back (inpipe_);
- pipes.swap (active, pipes.size () - 1);
- active++;
+ fq.attach (inpipe_);
}
void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
{
- // Remove the pipe from the list; adjust number of active pipes
- // accordingly.
zmq_assert (pipe_);
- pipes_t::size_type index = pipes.index (pipe_);
- if (index < active)
- active--;
- pipes.erase (index);
+ fq.detach (pipe_);
}
void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
@@ -65,16 +54,12 @@ void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
void zmq::upstream_t::xkill (class reader_t *pipe_)
{
- // Move the pipe to the list of inactive pipes.
- active--;
- pipes.swap (pipes.index (pipe_), active);
+ fq.kill (pipe_);
}
void zmq::upstream_t::xrevive (class reader_t *pipe_)
{
- // Move the pipe to the list of active pipes.
- pipes.swap (pipes.index (pipe_), active);
- active++;
+ fq.revive (pipe_);
}
int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
@@ -99,41 +84,12 @@ int zmq::upstream_t::xflush ()
int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
-
- // Round-robin over the pipes to get next message.
- for (int count = active; count != 0; count--) {
- bool fetched = pipes [current]->read (msg_);
- current++;
- if (current >= active)
- current = 0;
- if (fetched)
- return 0;
- }
-
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- zmq_msg_init (msg_);
- errno = EAGAIN;
- return -1;
+ return fq.recv (msg_, flags_);
}
bool zmq::upstream_t::xhas_in ()
{
- // Note that messing with current doesn't break the fairness of fair
- // queueing algorithm. If there are no messages available current will
- // get back to its original value. Otherwise it'll point to the first
- // pipe holding messages, skipping only pipes with no messages available.
- for (int count = active; count != 0; count--) {
- if (pipes [current]->check_read ())
- return true;
- current++;
- if (current >= active)
- current = 0;
- }
-
- return false;
+ return fq.has_in ();
}
bool zmq::upstream_t::xhas_out ()