summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-11-13 10:33:49 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-11-13 10:33:49 +0100
commit1c239708ab174c1de9f99e256d23158f74a24dbc (patch)
tree187e7633b187510bdcf9d67255b0f5f7f51e8ceb /src/xrep.cpp
parentf8b005502699aa069406923701af685cc156d3c2 (diff)
Couple of bugs in XREP handling of identities fixed.
wq: Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp37
1 files changed, 22 insertions, 15 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index ea19e56..336d4e5 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -189,14 +189,18 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
return 0;
}
- // Get next message part.
- pipe_t *pipe;
- int rc = fq.recvpipe (msg_, flags_, &pipe);
- if (rc != 0)
- return -1;
-
- // If identity is received, change the key assigned to the pipe.
- if (unlikely (msg_->flags () & msg_t::identity)) {
+ pipe_t *pipe = NULL;
+ while (true) {
+
+ // Get next message part.
+ int rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
+
+ // If identity is received, change the key assigned to the pipe.
+ if (likely (!(msg_->flags () & msg_t::identity)))
+ break;
+
zmq_assert (!more_in);
// Empty identity means we can preserve the auto-generated identity.
@@ -219,11 +223,6 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
}
zmq_assert (it != outpipes.end ());
}
-
- // After processing the identity, try to get the next message.
- rc = fq.recvpipe (msg_, flags_, &pipe);
- if (rc != 0)
- return -1;
}
// If we are in the middle of reading a message, just return the next part.
@@ -234,7 +233,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// We are at the beginning of a new message. Move the message part we
// have to the prefetched and return the ID of the peer instead.
- rc = prefetched_msg.move (*msg_);
+ int rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0);
prefetched = true;
rc = msg_->close ();
@@ -260,9 +259,17 @@ int zmq::xrep_t::rollback (void)
bool zmq::xrep_t::xhas_in ()
{
+ // We may already have a message pre-fetched.
if (prefetched)
return true;
- return fq.has_in ();
+
+ // Try to read the next message to the pre-fetch buffer.
+ int rc = xrecv (&prefetched_msg, ZMQ_DONTWAIT);
+ if (rc != 0 && errno == EAGAIN)
+ return false;
+ zmq_assert (rc == 0);
+ prefetched = true;
+ return true;
}
bool zmq::xrep_t::xhas_out ()