From 12532c7940f23fcb3cd46208c141d47647e76231 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 22 Jun 2011 16:51:40 +0200 Subject: O(1) fair-queueing in XREP implemented Up to now the complexity of fair-queueing in XREP was O(n). Signed-off-by: Martin Sustrik --- src/pipe.cpp | 13 ++++++- src/pipe.hpp | 8 ++++ src/xrep.cpp | 117 ++++++++++++++--------------------------------------------- src/xrep.hpp | 17 ++------- 4 files changed, 50 insertions(+), 105 deletions(-) (limited to 'src') diff --git a/src/pipe.cpp b/src/pipe.cpp index 1813ca0..c290bae 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -63,7 +63,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peer (NULL), sink (NULL), state (active), - delay (delay_) + delay (delay_), + pipe_id (0) { } @@ -85,6 +86,16 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) sink = sink_; } +void zmq::pipe_t::set_pipe_id (uint32_t id_) +{ + pipe_id = id_; +} + +uint32_t zmq::pipe_t::get_pipe_id () +{ + return pipe_id; +} + bool zmq::pipe_t::check_read () { if (unlikely (!in_active || (state != active && state != pending))) diff --git a/src/pipe.hpp b/src/pipe.hpp index d3bf866..df85bc2 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -25,6 +25,7 @@ #include "ypipe.hpp" #include "config.hpp" #include "object.hpp" +#include "stdint.hpp" #include "array.hpp" namespace zmq @@ -67,6 +68,10 @@ namespace zmq // Specifies the object to send events to. void set_event_sink (i_pipe_events *sink_); + // Pipe endpoint can store an opaque ID to be used by its clients. + void set_pipe_id (uint32_t id_); + uint32_t get_pipe_id (); + // Returns true if there is at least one message to read in the pipe. bool check_read (); @@ -176,6 +181,9 @@ namespace zmq // asks us to. bool delay; + // Opaque ID. To be used by the clients, not the pipe itself. + uint32_t pipe_id; + // Returns true if the message is delimiter; false otherwise. static bool is_delimiter (msg_t &msg_); diff --git a/src/xrep.cpp b/src/xrep.cpp index ab2f0a8..4a474cf 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -26,7 +26,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), - current_in (0), prefetched (false), more_in (false), current_out (NULL), @@ -34,14 +33,16 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : { options.type = ZMQ_XREP; + prefetched_msg.init (); + // Start the peer ID sequence from a random point. generate_random (&next_peer_id, sizeof (next_peer_id)); } zmq::xrep_t::~xrep_t () { - zmq_assert (inpipes.empty ()); zmq_assert (outpipes.empty ()); + prefetched_msg.close (); } void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) @@ -68,8 +69,8 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) zmq_assert (ok); // Add the pipe to the list of inbound pipes. - inpipe_t inpipe = {pipe_, next_peer_id, true}; - inpipes.push_back (inpipe); + pipe_->set_pipe_id (next_peer_id); + fq.attach (pipe_); // Advance next peer ID so that if new connection is dropped shortly after // its creation we don't accidentally get two subsequent peers with @@ -79,20 +80,8 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) void zmq::xrep_t::xterminated (pipe_t *pipe_) { - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); - ++it) { - if (it->pipe == pipe_) { - if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in) - current_in--; - inpipes.erase (it); - if (current_in >= inpipes.size ()) - current_in = 0; - goto clean_outpipes; - } - } - zmq_assert (false); + fq.terminated (pipe_); -clean_outpipes: for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { if (it->second.pipe == pipe_) { @@ -107,15 +96,7 @@ clean_outpipes: void zmq::xrep_t::xread_activated (pipe_t *pipe_) { - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); - ++it) { - if (it->pipe == pipe_) { - zmq_assert (!it->active); - it->active = true; - return; - } - } - zmq_assert (false); + fq.activated (pipe_); } void zmq::xrep_t::xwrite_activated (pipe_t *pipe_) @@ -212,55 +193,30 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) return 0; } - // Deallocate old content of the message. - int rc = msg_->close (); - errno_assert (rc == 0); + // Get next message part. + pipe_t *pipe; + int rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; - // If we are in the middle of reading a message, just grab next part of it. + // If we are in the middle of reading a message, just return the next part. if (more_in) { - zmq_assert (inpipes [current_in].active); - bool fetched = inpipes [current_in].pipe->read (msg_); - zmq_assert (fetched); more_in = msg_->flags () & (msg_t::more | msg_t::label); - if (!more_in) { - current_in++; - if (current_in >= inpipes.size ()) - current_in = 0; - } return 0; } - - // Round-robin over the pipes to get the next message. - for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { - - // Try to fetch new message. - if (inpipes [current_in].active) - prefetched = inpipes [current_in].pipe->read (&prefetched_msg); - - // If we have a message, create a prefix and return it to the caller. - if (prefetched) { - int rc = msg_->init_size (4); - errno_assert (rc == 0); - put_uint32 ((unsigned char*) msg_->data (), - inpipes [current_in].peer_id); - msg_->set_flags (msg_t::label); - return 0; - } - - // If me don't have a message, mark the pipe as passive and - // move to next pipe. - inpipes [current_in].active = false; - current_in++; - if (current_in >= inpipes.size ()) - current_in = 0; - } - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - rc = msg_->init (); + + // We are at the beginning of a new message. Move the message part we + // have to the prefetched and return the ID of the peer instead. + rc = prefetched_msg.move (*msg_); + errno_assert (rc == 0); + prefetched = true; + rc = msg_->close (); errno_assert (rc == 0); - errno = EAGAIN; - return -1; + rc = msg_->init_size (4); + errno_assert (rc == 0); + put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); + msg_->set_flags (msg_t::label); + return 0; } int zmq::xrep_t::rollback (void) @@ -275,28 +231,9 @@ int zmq::xrep_t::rollback (void) bool zmq::xrep_t::xhas_in () { - // There are subsequent parts of the partly-read message available. - if (prefetched || more_in) + if (prefetched) return true; - - // Note that messing with current doesn't break the fairness of fair - // queueing algorithm. If there are no messages available current will - // get back to its original value. Otherwise it'll point to the first - // pipe holding messages, skipping only pipes with no messages available. - for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { - if (inpipes [current_in].active && - inpipes [current_in].pipe->check_read ()) - return true; - - // If me don't have a message, mark the pipe as passive and - // move to next pipe. - inpipes [current_in].active = false; - current_in++; - if (current_in >= inpipes.size ()) - current_in = 0; - } - - return false; + return fq.has_in (); } bool zmq::xrep_t::xhas_out () diff --git a/src/xrep.hpp b/src/xrep.hpp index d5014e0..41c06b8 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -22,11 +22,11 @@ #define __ZMQ_XREP_HPP_INCLUDED__ #include -#include #include "socket_base.hpp" #include "stdint.hpp" #include "msg.hpp" +#include "fq.hpp" namespace zmq { @@ -58,19 +58,8 @@ namespace zmq private: - struct inpipe_t - { - class pipe_t *pipe; - uint32_t peer_id; - bool active; - }; - - // Inbound pipes with the names of corresponging peers. - typedef std::vector inpipes_t; - inpipes_t inpipes; - - // The pipe we are currently reading from. - inpipes_t::size_type current_in; + // Fair queueing object for inbound pipes. + fq_t fq; // Have we prefetched a message. bool prefetched; -- cgit v1.2.3