From 1c239708ab174c1de9f99e256d23158f74a24dbc Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 13 Nov 2011 10:33:49 +0100 Subject: Couple of bugs in XREP handling of identities fixed. wq: Signed-off-by: Martin Sustrik --- src/xrep.cpp | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index ea19e56..336d4e5 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -189,14 +189,18 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) return 0; } - // Get next message part. - pipe_t *pipe; - int rc = fq.recvpipe (msg_, flags_, &pipe); - if (rc != 0) - return -1; - - // If identity is received, change the key assigned to the pipe. - if (unlikely (msg_->flags () & msg_t::identity)) { + pipe_t *pipe = NULL; + while (true) { + + // Get next message part. + int rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; + + // If identity is received, change the key assigned to the pipe. + if (likely (!(msg_->flags () & msg_t::identity))) + break; + zmq_assert (!more_in); // Empty identity means we can preserve the auto-generated identity. @@ -219,11 +223,6 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) } zmq_assert (it != outpipes.end ()); } - - // After processing the identity, try to get the next message. - rc = fq.recvpipe (msg_, flags_, &pipe); - if (rc != 0) - return -1; } // If we are in the middle of reading a message, just return the next part. @@ -234,7 +233,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // We are at the beginning of a new message. Move the message part we // have to the prefetched and return the ID of the peer instead. - rc = prefetched_msg.move (*msg_); + int rc = prefetched_msg.move (*msg_); errno_assert (rc == 0); prefetched = true; rc = msg_->close (); @@ -260,9 +259,17 @@ int zmq::xrep_t::rollback (void) bool zmq::xrep_t::xhas_in () { + // We may already have a message pre-fetched. if (prefetched) return true; - return fq.has_in (); + + // Try to read the next message to the pre-fetch buffer. + int rc = xrecv (&prefetched_msg, ZMQ_DONTWAIT); + if (rc != 0 && errno == EAGAIN) + return false; + zmq_assert (rc == 0); + prefetched = true; + return true; } bool zmq::xrep_t::xhas_out () -- cgit v1.2.3