diff options
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r-- | src/xrep.cpp | 117 |
1 files changed, 27 insertions, 90 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp index ab2f0a8..4a474cf 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -26,7 +26,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), - current_in (0), prefetched (false), more_in (false), current_out (NULL), @@ -34,14 +33,16 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : { options.type = ZMQ_XREP; + prefetched_msg.init (); + // Start the peer ID sequence from a random point. generate_random (&next_peer_id, sizeof (next_peer_id)); } zmq::xrep_t::~xrep_t () { - zmq_assert (inpipes.empty ()); zmq_assert (outpipes.empty ()); + prefetched_msg.close (); } void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) @@ -68,8 +69,8 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) zmq_assert (ok); // Add the pipe to the list of inbound pipes. - inpipe_t inpipe = {pipe_, next_peer_id, true}; - inpipes.push_back (inpipe); + pipe_->set_pipe_id (next_peer_id); + fq.attach (pipe_); // Advance next peer ID so that if new connection is dropped shortly after // its creation we don't accidentally get two subsequent peers with @@ -79,20 +80,8 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) void zmq::xrep_t::xterminated (pipe_t *pipe_) { - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); - ++it) { - if (it->pipe == pipe_) { - if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in) - current_in--; - inpipes.erase (it); - if (current_in >= inpipes.size ()) - current_in = 0; - goto clean_outpipes; - } - } - zmq_assert (false); + fq.terminated (pipe_); -clean_outpipes: for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { if (it->second.pipe == pipe_) { @@ -107,15 +96,7 @@ clean_outpipes: void zmq::xrep_t::xread_activated (pipe_t *pipe_) { - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); - ++it) { - if (it->pipe == pipe_) { - zmq_assert (!it->active); - it->active = true; - return; - } - } - zmq_assert (false); + fq.activated (pipe_); } void zmq::xrep_t::xwrite_activated (pipe_t *pipe_) @@ -212,55 +193,30 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) return 0; } - // Deallocate old content of the message. - int rc = msg_->close (); - errno_assert (rc == 0); + // Get next message part. + pipe_t *pipe; + int rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; - // If we are in the middle of reading a message, just grab next part of it. + // If we are in the middle of reading a message, just return the next part. if (more_in) { - zmq_assert (inpipes [current_in].active); - bool fetched = inpipes [current_in].pipe->read (msg_); - zmq_assert (fetched); more_in = msg_->flags () & (msg_t::more | msg_t::label); - if (!more_in) { - current_in++; - if (current_in >= inpipes.size ()) - current_in = 0; - } return 0; } - - // Round-robin over the pipes to get the next message. - for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { - - // Try to fetch new message. - if (inpipes [current_in].active) - prefetched = inpipes [current_in].pipe->read (&prefetched_msg); - - // If we have a message, create a prefix and return it to the caller. - if (prefetched) { - int rc = msg_->init_size (4); - errno_assert (rc == 0); - put_uint32 ((unsigned char*) msg_->data (), - inpipes [current_in].peer_id); - msg_->set_flags (msg_t::label); - return 0; - } - - // If me don't have a message, mark the pipe as passive and - // move to next pipe. - inpipes [current_in].active = false; - current_in++; - if (current_in >= inpipes.size ()) - current_in = 0; - } - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - rc = msg_->init (); + + // We are at the beginning of a new message. Move the message part we + // have to the prefetched and return the ID of the peer instead. + rc = prefetched_msg.move (*msg_); + errno_assert (rc == 0); + prefetched = true; + rc = msg_->close (); errno_assert (rc == 0); - errno = EAGAIN; - return -1; + rc = msg_->init_size (4); + errno_assert (rc == 0); + put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); + msg_->set_flags (msg_t::label); + return 0; } int zmq::xrep_t::rollback (void) @@ -275,28 +231,9 @@ int zmq::xrep_t::rollback (void) bool zmq::xrep_t::xhas_in () { - // There are subsequent parts of the partly-read message available. - if (prefetched || more_in) + if (prefetched) return true; - - // Note that messing with current doesn't break the fairness of fair - // queueing algorithm. If there are no messages available current will - // get back to its original value. Otherwise it'll point to the first - // pipe holding messages, skipping only pipes with no messages available. - for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { - if (inpipes [current_in].active && - inpipes [current_in].pipe->check_read ()) - return true; - - // If me don't have a message, mark the pipe as passive and - // move to next pipe. - inpipes [current_in].active = false; - current_in++; - if (current_in >= inpipes.size ()) - current_in = 0; - } - - return false; + return fq.has_in (); } bool zmq::xrep_t::xhas_out () |