summaryrefslogtreecommitdiff
path: root/src/sub.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-15 09:09:19 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-15 09:09:19 +0100
commite49115224a7957b0e5d49326bc02ae6af186eaf9 (patch)
tree81d1ca0ea496004bbc85cec9b3289af96cdaa197 /src/sub.cpp
parentbd792faa9d6c78c375dbc52c6d773e157335da36 (diff)
zmq_encoder/decoder are able to add/trim prefixes from messages; fair queueing and load balancing algorithms factorised into separate classes
Diffstat (limited to 'src/sub.cpp')
-rw-r--r--src/sub.cpp50
1 files changed, 8 insertions, 42 deletions
diff --git a/src/sub.cpp b/src/sub.cpp
index a7f9783..e5dbe76 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -21,12 +21,9 @@
#include "sub.hpp"
#include "err.hpp"
-#include "pipe.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
- active (0),
- current (0),
all_count (0)
{
options.requires_in = true;
@@ -35,44 +32,35 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) :
zmq::sub_t::~sub_t ()
{
- for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
- in_pipes [i]->term ();
- in_pipes.clear ();
}
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
- zmq_assert (!outpipe_);
- in_pipes.push_back (inpipe_);
- in_pipes.swap (active, in_pipes.size () - 1);
- active++;
+ zmq_assert (inpipe_ && !outpipe_);
+ fq.attach (inpipe_);
}
void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_)
{
- if (in_pipes.index (pipe_) < active)
- active--;
- in_pipes.erase (pipe_);
+ zmq_assert (pipe_);
+ fq.detach (pipe_);
}
void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_)
{
+ // SUB socket is read-only thus there should be no outpipes.
zmq_assert (false);
}
void zmq::sub_t::xkill (class reader_t *pipe_)
{
- // Move the pipe to the list of inactive pipes.
- in_pipes.swap (in_pipes.index (pipe_), active - 1);
- active--;
+ fq.kill (pipe_);
}
void zmq::sub_t::xrevive (class reader_t *pipe_)
{
- // Move the pipe to the list of active pipes.
- in_pipes.swap (in_pipes.index (pipe_), active);
- active++;
+ fq.revive (pipe_);
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
@@ -139,7 +127,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
while (true) {
// Get a message using fair queueing algorithm.
- int rc = fq (msg_, flags_);
+ int rc = fq.recv (msg_, flags_);
// If there's no message available, return immediately.
if (rc != 0 && errno == EAGAIN)
@@ -176,28 +164,6 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
}
}
-int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
-{
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
-
- // Round-robin over the pipes to get next message.
- for (int count = active; count != 0; count--) {
- bool fetched = in_pipes [current]->read (msg_);
- current++;
- if (current >= active)
- current = 0;
- if (fetched)
- return 0;
- }
-
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- zmq_msg_init (msg_);
- errno = EAGAIN;
- return -1;
-}
-
bool zmq::sub_t::xhas_in ()
{
// TODO: This is more complex as we have to ignore all the messages that