diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-11-13 10:33:49 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-11-13 10:33:49 +0100 |
commit | 1c239708ab174c1de9f99e256d23158f74a24dbc (patch) | |
tree | 187e7633b187510bdcf9d67255b0f5f7f51e8ceb | |
parent | f8b005502699aa069406923701af685cc156d3c2 (diff) |
Couple of bugs in XREP handling of identities fixed.
wq:
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | src/xrep.cpp | 37 |
1 files changed, 22 insertions, 15 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp index ea19e56..336d4e5 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -189,14 +189,18 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) return 0; } - // Get next message part. - pipe_t *pipe; - int rc = fq.recvpipe (msg_, flags_, &pipe); - if (rc != 0) - return -1; - - // If identity is received, change the key assigned to the pipe. - if (unlikely (msg_->flags () & msg_t::identity)) { + pipe_t *pipe = NULL; + while (true) { + + // Get next message part. + int rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; + + // If identity is received, change the key assigned to the pipe. + if (likely (!(msg_->flags () & msg_t::identity))) + break; + zmq_assert (!more_in); // Empty identity means we can preserve the auto-generated identity. @@ -219,11 +223,6 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) } zmq_assert (it != outpipes.end ()); } - - // After processing the identity, try to get the next message. - rc = fq.recvpipe (msg_, flags_, &pipe); - if (rc != 0) - return -1; } // If we are in the middle of reading a message, just return the next part. @@ -234,7 +233,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // We are at the beginning of a new message. Move the message part we // have to the prefetched and return the ID of the peer instead. - rc = prefetched_msg.move (*msg_); + int rc = prefetched_msg.move (*msg_); errno_assert (rc == 0); prefetched = true; rc = msg_->close (); @@ -260,9 +259,17 @@ int zmq::xrep_t::rollback (void) bool zmq::xrep_t::xhas_in () { + // We may already have a message pre-fetched. if (prefetched) return true; - return fq.has_in (); + + // Try to read the next message to the pre-fetch buffer. + int rc = xrecv (&prefetched_msg, ZMQ_DONTWAIT); + if (rc != 0 && errno == EAGAIN) + return false; + zmq_assert (rc == 0); + prefetched = true; + return true; } bool zmq::xrep_t::xhas_out () |