From e49115224a7957b0e5d49326bc02ae6af186eaf9 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 15 Dec 2009 09:09:19 +0100 Subject: zmq_encoder/decoder are able to add/trim prefixes from messages; fair queueing and load balancing algorithms factorised into separate classes --- src/sub.cpp | 50 ++++++++------------------------------------------ 1 file changed, 8 insertions(+), 42 deletions(-) (limited to 'src/sub.cpp') diff --git a/src/sub.cpp b/src/sub.cpp index a7f9783..e5dbe76 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -21,12 +21,9 @@ #include "sub.hpp" #include "err.hpp" -#include "pipe.hpp" zmq::sub_t::sub_t (class app_thread_t *parent_) : socket_base_t (parent_), - active (0), - current (0), all_count (0) { options.requires_in = true; @@ -35,44 +32,35 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) : zmq::sub_t::~sub_t () { - for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++) - in_pipes [i]->term (); - in_pipes.clear (); } void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { - zmq_assert (!outpipe_); - in_pipes.push_back (inpipe_); - in_pipes.swap (active, in_pipes.size () - 1); - active++; + zmq_assert (inpipe_ && !outpipe_); + fq.attach (inpipe_); } void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_) { - if (in_pipes.index (pipe_) < active) - active--; - in_pipes.erase (pipe_); + zmq_assert (pipe_); + fq.detach (pipe_); } void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_) { + // SUB socket is read-only thus there should be no outpipes. zmq_assert (false); } void zmq::sub_t::xkill (class reader_t *pipe_) { - // Move the pipe to the list of inactive pipes. - in_pipes.swap (in_pipes.index (pipe_), active - 1); - active--; + fq.kill (pipe_); } void zmq::sub_t::xrevive (class reader_t *pipe_) { - // Move the pipe to the list of active pipes. - in_pipes.swap (in_pipes.index (pipe_), active); - active++; + fq.revive (pipe_); } int zmq::sub_t::xsetsockopt (int option_, const void *optval_, @@ -139,7 +127,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) while (true) { // Get a message using fair queueing algorithm. - int rc = fq (msg_, flags_); + int rc = fq.recv (msg_, flags_); // If there's no message available, return immediately. if (rc != 0 && errno == EAGAIN) @@ -176,28 +164,6 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) } } -int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_) -{ - // Deallocate old content of the message. - zmq_msg_close (msg_); - - // Round-robin over the pipes to get next message. - for (int count = active; count != 0; count--) { - bool fetched = in_pipes [current]->read (msg_); - current++; - if (current >= active) - current = 0; - if (fetched) - return 0; - } - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - zmq_msg_init (msg_); - errno = EAGAIN; - return -1; -} - bool zmq::sub_t::xhas_in () { // TODO: This is more complex as we have to ignore all the messages that -- cgit v1.2.3