diff options
| -rw-r--r-- | src/xrep.cpp | 37 | 
1 files changed, 22 insertions, 15 deletions
| diff --git a/src/xrep.cpp b/src/xrep.cpp index ea19e56..336d4e5 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -189,14 +189,18 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)          return 0;      } -    //  Get next message part. -    pipe_t *pipe; -    int rc = fq.recvpipe (msg_, flags_, &pipe); -    if (rc != 0) -        return -1; - -    //  If identity is received, change the key assigned to the pipe. -    if (unlikely (msg_->flags () & msg_t::identity)) { +    pipe_t *pipe = NULL; +    while (true) { + +        //  Get next message part. +        int rc = fq.recvpipe (msg_, flags_, &pipe); +        if (rc != 0) +            return -1; + +        //  If identity is received, change the key assigned to the pipe. +        if (likely (!(msg_->flags () & msg_t::identity))) +            break; +          zmq_assert (!more_in);          //  Empty identity means we can preserve the auto-generated identity. @@ -219,11 +223,6 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)              }              zmq_assert (it != outpipes.end ());          } - -        //  After processing the identity, try to get the next message. -        rc = fq.recvpipe (msg_, flags_, &pipe); -        if (rc != 0) -            return -1;      }      //  If we are in the middle of reading a message, just return the next part. @@ -234,7 +233,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)      //  We are at the beginning of a new message. Move the message part we      //  have to the prefetched and return the ID of the peer instead. -    rc = prefetched_msg.move (*msg_); +    int rc = prefetched_msg.move (*msg_);      errno_assert (rc == 0);      prefetched = true;      rc = msg_->close (); @@ -260,9 +259,17 @@ int zmq::xrep_t::rollback (void)  bool zmq::xrep_t::xhas_in ()  { +    //  We may already have a message pre-fetched.      if (prefetched)          return true; -    return fq.has_in (); + +    //  Try to read the next message to the pre-fetch buffer. +    int rc = xrecv (&prefetched_msg, ZMQ_DONTWAIT); +    if (rc != 0 && errno == EAGAIN) +        return false; +    zmq_assert (rc == 0); +    prefetched = true; +    return true;  }  bool zmq::xrep_t::xhas_out () | 
