summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp117
1 files changed, 27 insertions, 90 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index ab2f0a8..4a474cf 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -26,7 +26,6 @@
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
- current_in (0),
prefetched (false),
more_in (false),
current_out (NULL),
@@ -34,14 +33,16 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREP;
+ prefetched_msg.init ();
+
// Start the peer ID sequence from a random point.
generate_random (&next_peer_id, sizeof (next_peer_id));
}
zmq::xrep_t::~xrep_t ()
{
- zmq_assert (inpipes.empty ());
zmq_assert (outpipes.empty ());
+ prefetched_msg.close ();
}
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
@@ -68,8 +69,8 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
- inpipe_t inpipe = {pipe_, next_peer_id, true};
- inpipes.push_back (inpipe);
+ pipe_->set_pipe_id (next_peer_id);
+ fq.attach (pipe_);
// Advance next peer ID so that if new connection is dropped shortly after
// its creation we don't accidentally get two subsequent peers with
@@ -79,20 +80,8 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::xrep_t::xterminated (pipe_t *pipe_)
{
- for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
- ++it) {
- if (it->pipe == pipe_) {
- if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in)
- current_in--;
- inpipes.erase (it);
- if (current_in >= inpipes.size ())
- current_in = 0;
- goto clean_outpipes;
- }
- }
- zmq_assert (false);
+ fq.terminated (pipe_);
-clean_outpipes:
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
if (it->second.pipe == pipe_) {
@@ -107,15 +96,7 @@ clean_outpipes:
void zmq::xrep_t::xread_activated (pipe_t *pipe_)
{
- for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
- ++it) {
- if (it->pipe == pipe_) {
- zmq_assert (!it->active);
- it->active = true;
- return;
- }
- }
- zmq_assert (false);
+ fq.activated (pipe_);
}
void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
@@ -212,55 +193,30 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
return 0;
}
- // Deallocate old content of the message.
- int rc = msg_->close ();
- errno_assert (rc == 0);
+ // Get next message part.
+ pipe_t *pipe;
+ int rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
- // If we are in the middle of reading a message, just grab next part of it.
+ // If we are in the middle of reading a message, just return the next part.
if (more_in) {
- zmq_assert (inpipes [current_in].active);
- bool fetched = inpipes [current_in].pipe->read (msg_);
- zmq_assert (fetched);
more_in = msg_->flags () & (msg_t::more | msg_t::label);
- if (!more_in) {
- current_in++;
- if (current_in >= inpipes.size ())
- current_in = 0;
- }
return 0;
}
-
- // Round-robin over the pipes to get the next message.
- for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) {
-
- // Try to fetch new message.
- if (inpipes [current_in].active)
- prefetched = inpipes [current_in].pipe->read (&prefetched_msg);
-
- // If we have a message, create a prefix and return it to the caller.
- if (prefetched) {
- int rc = msg_->init_size (4);
- errno_assert (rc == 0);
- put_uint32 ((unsigned char*) msg_->data (),
- inpipes [current_in].peer_id);
- msg_->set_flags (msg_t::label);
- return 0;
- }
-
- // If me don't have a message, mark the pipe as passive and
- // move to next pipe.
- inpipes [current_in].active = false;
- current_in++;
- if (current_in >= inpipes.size ())
- current_in = 0;
- }
-
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- rc = msg_->init ();
+
+ // We are at the beginning of a new message. Move the message part we
+ // have to the prefetched and return the ID of the peer instead.
+ rc = prefetched_msg.move (*msg_);
+ errno_assert (rc == 0);
+ prefetched = true;
+ rc = msg_->close ();
errno_assert (rc == 0);
- errno = EAGAIN;
- return -1;
+ rc = msg_->init_size (4);
+ errno_assert (rc == 0);
+ put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
+ msg_->set_flags (msg_t::label);
+ return 0;
}
int zmq::xrep_t::rollback (void)
@@ -275,28 +231,9 @@ int zmq::xrep_t::rollback (void)
bool zmq::xrep_t::xhas_in ()
{
- // There are subsequent parts of the partly-read message available.
- if (prefetched || more_in)
+ if (prefetched)
return true;
-
- // Note that messing with current doesn't break the fairness of fair
- // queueing algorithm. If there are no messages available current will
- // get back to its original value. Otherwise it'll point to the first
- // pipe holding messages, skipping only pipes with no messages available.
- for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) {
- if (inpipes [current_in].active &&
- inpipes [current_in].pipe->check_read ())
- return true;
-
- // If me don't have a message, mark the pipe as passive and
- // move to next pipe.
- inpipes [current_in].active = false;
- current_in++;
- if (current_in >= inpipes.size ())
- current_in = 0;
- }
-
- return false;
+ return fq.has_in ();
}
bool zmq::xrep_t::xhas_out ()