From e49115224a7957b0e5d49326bc02ae6af186eaf9 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 15 Dec 2009 09:09:19 +0100 Subject: zmq_encoder/decoder are able to add/trim prefixes from messages; fair queueing and load balancing algorithms factorised into separate classes --- src/upstream.cpp | 58 +++++++------------------------------------------------- 1 file changed, 7 insertions(+), 51 deletions(-) (limited to 'src/upstream.cpp') diff --git a/src/upstream.cpp b/src/upstream.cpp index da202f8..32de63a 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -21,12 +21,9 @@ #include "upstream.hpp" #include "err.hpp" -#include "pipe.hpp" zmq::upstream_t::upstream_t (class app_thread_t *parent_) : - socket_base_t (parent_), - active (0), - current (0) + socket_base_t (parent_) { options.requires_in = true; options.requires_out = false; @@ -40,21 +37,13 @@ void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { zmq_assert (inpipe_ && !outpipe_); - - pipes.push_back (inpipe_); - pipes.swap (active, pipes.size () - 1); - active++; + fq.attach (inpipe_); } void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) { - // Remove the pipe from the list; adjust number of active pipes - // accordingly. zmq_assert (pipe_); - pipes_t::size_type index = pipes.index (pipe_); - if (index < active) - active--; - pipes.erase (index); + fq.detach (pipe_); } void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) @@ -65,16 +54,12 @@ void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) void zmq::upstream_t::xkill (class reader_t *pipe_) { - // Move the pipe to the list of inactive pipes. - active--; - pipes.swap (pipes.index (pipe_), active); + fq.kill (pipe_); } void zmq::upstream_t::xrevive (class reader_t *pipe_) { - // Move the pipe to the list of active pipes. - pipes.swap (pipes.index (pipe_), active); - active++; + fq.revive (pipe_); } int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, @@ -99,41 +84,12 @@ int zmq::upstream_t::xflush () int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) { - // Deallocate old content of the message. - zmq_msg_close (msg_); - - // Round-robin over the pipes to get next message. - for (int count = active; count != 0; count--) { - bool fetched = pipes [current]->read (msg_); - current++; - if (current >= active) - current = 0; - if (fetched) - return 0; - } - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - zmq_msg_init (msg_); - errno = EAGAIN; - return -1; + return fq.recv (msg_, flags_); } bool zmq::upstream_t::xhas_in () { - // Note that messing with current doesn't break the fairness of fair - // queueing algorithm. If there are no messages available current will - // get back to its original value. Otherwise it'll point to the first - // pipe holding messages, skipping only pipes with no messages available. - for (int count = active; count != 0; count--) { - if (pipes [current]->check_read ()) - return true; - current++; - if (current >= active) - current = 0; - } - - return false; + return fq.has_in (); } bool zmq::upstream_t::xhas_out () -- cgit v1.2.3