summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-06-22 16:51:40 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-06-22 16:51:40 +0200
commit12532c7940f23fcb3cd46208c141d47647e76231 (patch)
tree79600d31a8acb70e253bb5756a6c71dbf2332944 /src
parentec81f8fb2523e1e2fe45eaadc05311a35bf551d7 (diff)
O(1) fair-queueing in XREP implemented
Up to now the complexity of fair-queueing in XREP was O(n). Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/pipe.cpp13
-rw-r--r--src/pipe.hpp8
-rw-r--r--src/xrep.cpp117
-rw-r--r--src/xrep.hpp17
4 files changed, 50 insertions, 105 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 1813ca0..c290bae 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -63,7 +63,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL),
sink (NULL),
state (active),
- delay (delay_)
+ delay (delay_),
+ pipe_id (0)
{
}
@@ -85,6 +86,16 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
sink = sink_;
}
+void zmq::pipe_t::set_pipe_id (uint32_t id_)
+{
+ pipe_id = id_;
+}
+
+uint32_t zmq::pipe_t::get_pipe_id ()
+{
+ return pipe_id;
+}
+
bool zmq::pipe_t::check_read ()
{
if (unlikely (!in_active || (state != active && state != pending)))
diff --git a/src/pipe.hpp b/src/pipe.hpp
index d3bf866..df85bc2 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -25,6 +25,7 @@
#include "ypipe.hpp"
#include "config.hpp"
#include "object.hpp"
+#include "stdint.hpp"
#include "array.hpp"
namespace zmq
@@ -67,6 +68,10 @@ namespace zmq
// Specifies the object to send events to.
void set_event_sink (i_pipe_events *sink_);
+ // Pipe endpoint can store an opaque ID to be used by its clients.
+ void set_pipe_id (uint32_t id_);
+ uint32_t get_pipe_id ();
+
// Returns true if there is at least one message to read in the pipe.
bool check_read ();
@@ -176,6 +181,9 @@ namespace zmq
// asks us to.
bool delay;
+ // Opaque ID. To be used by the clients, not the pipe itself.
+ uint32_t pipe_id;
+
// Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (msg_t &msg_);
diff --git a/src/xrep.cpp b/src/xrep.cpp
index ab2f0a8..4a474cf 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -26,7 +26,6 @@
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
- current_in (0),
prefetched (false),
more_in (false),
current_out (NULL),
@@ -34,14 +33,16 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREP;
+ prefetched_msg.init ();
+
// Start the peer ID sequence from a random point.
generate_random (&next_peer_id, sizeof (next_peer_id));
}
zmq::xrep_t::~xrep_t ()
{
- zmq_assert (inpipes.empty ());
zmq_assert (outpipes.empty ());
+ prefetched_msg.close ();
}
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
@@ -68,8 +69,8 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
- inpipe_t inpipe = {pipe_, next_peer_id, true};
- inpipes.push_back (inpipe);
+ pipe_->set_pipe_id (next_peer_id);
+ fq.attach (pipe_);
// Advance next peer ID so that if new connection is dropped shortly after
// its creation we don't accidentally get two subsequent peers with
@@ -79,20 +80,8 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::xrep_t::xterminated (pipe_t *pipe_)
{
- for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
- ++it) {
- if (it->pipe == pipe_) {
- if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in)
- current_in--;
- inpipes.erase (it);
- if (current_in >= inpipes.size ())
- current_in = 0;
- goto clean_outpipes;
- }
- }
- zmq_assert (false);
+ fq.terminated (pipe_);
-clean_outpipes:
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
if (it->second.pipe == pipe_) {
@@ -107,15 +96,7 @@ clean_outpipes:
void zmq::xrep_t::xread_activated (pipe_t *pipe_)
{
- for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
- ++it) {
- if (it->pipe == pipe_) {
- zmq_assert (!it->active);
- it->active = true;
- return;
- }
- }
- zmq_assert (false);
+ fq.activated (pipe_);
}
void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
@@ -212,55 +193,30 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
return 0;
}
- // Deallocate old content of the message.
- int rc = msg_->close ();
- errno_assert (rc == 0);
+ // Get next message part.
+ pipe_t *pipe;
+ int rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
- // If we are in the middle of reading a message, just grab next part of it.
+ // If we are in the middle of reading a message, just return the next part.
if (more_in) {
- zmq_assert (inpipes [current_in].active);
- bool fetched = inpipes [current_in].pipe->read (msg_);
- zmq_assert (fetched);
more_in = msg_->flags () & (msg_t::more | msg_t::label);
- if (!more_in) {
- current_in++;
- if (current_in >= inpipes.size ())
- current_in = 0;
- }
return 0;
}
-
- // Round-robin over the pipes to get the next message.
- for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) {
-
- // Try to fetch new message.
- if (inpipes [current_in].active)
- prefetched = inpipes [current_in].pipe->read (&prefetched_msg);
-
- // If we have a message, create a prefix and return it to the caller.
- if (prefetched) {
- int rc = msg_->init_size (4);
- errno_assert (rc == 0);
- put_uint32 ((unsigned char*) msg_->data (),
- inpipes [current_in].peer_id);
- msg_->set_flags (msg_t::label);
- return 0;
- }
-
- // If me don't have a message, mark the pipe as passive and
- // move to next pipe.
- inpipes [current_in].active = false;
- current_in++;
- if (current_in >= inpipes.size ())
- current_in = 0;
- }
-
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- rc = msg_->init ();
+
+ // We are at the beginning of a new message. Move the message part we
+ // have to the prefetched and return the ID of the peer instead.
+ rc = prefetched_msg.move (*msg_);
+ errno_assert (rc == 0);
+ prefetched = true;
+ rc = msg_->close ();
errno_assert (rc == 0);
- errno = EAGAIN;
- return -1;
+ rc = msg_->init_size (4);
+ errno_assert (rc == 0);
+ put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
+ msg_->set_flags (msg_t::label);
+ return 0;
}
int zmq::xrep_t::rollback (void)
@@ -275,28 +231,9 @@ int zmq::xrep_t::rollback (void)
bool zmq::xrep_t::xhas_in ()
{
- // There are subsequent parts of the partly-read message available.
- if (prefetched || more_in)
+ if (prefetched)
return true;
-
- // Note that messing with current doesn't break the fairness of fair
- // queueing algorithm. If there are no messages available current will
- // get back to its original value. Otherwise it'll point to the first
- // pipe holding messages, skipping only pipes with no messages available.
- for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) {
- if (inpipes [current_in].active &&
- inpipes [current_in].pipe->check_read ())
- return true;
-
- // If me don't have a message, mark the pipe as passive and
- // move to next pipe.
- inpipes [current_in].active = false;
- current_in++;
- if (current_in >= inpipes.size ())
- current_in = 0;
- }
-
- return false;
+ return fq.has_in ();
}
bool zmq::xrep_t::xhas_out ()
diff --git a/src/xrep.hpp b/src/xrep.hpp
index d5014e0..41c06b8 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -22,11 +22,11 @@
#define __ZMQ_XREP_HPP_INCLUDED__
#include <map>
-#include <vector>
#include "socket_base.hpp"
#include "stdint.hpp"
#include "msg.hpp"
+#include "fq.hpp"
namespace zmq
{
@@ -58,19 +58,8 @@ namespace zmq
private:
- struct inpipe_t
- {
- class pipe_t *pipe;
- uint32_t peer_id;
- bool active;
- };
-
- // Inbound pipes with the names of corresponging peers.
- typedef std::vector <inpipe_t> inpipes_t;
- inpipes_t inpipes;
-
- // The pipe we are currently reading from.
- inpipes_t::size_type current_in;
+ // Fair queueing object for inbound pipes.
+ fq_t fq;
// Have we prefetched a message.
bool prefetched;