diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/xreq.cpp | 26 | ||||
| -rw-r--r-- | src/xreq.hpp | 6 | 
2 files changed, 30 insertions, 2 deletions
| diff --git a/src/xreq.cpp b/src/xreq.cpp index 91317f7..59d3f39 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -24,7 +24,8 @@  #include "msg.hpp"  zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : -    socket_base_t (parent_, tid_) +    socket_base_t (parent_, tid_), +    prefetched (false)  {      options.type = ZMQ_XREQ; @@ -36,10 +37,13 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :      options.send_identity = true;      options.recv_identity = true; + +    prefetched_msg.init ();  }  zmq::xreq_t::~xreq_t ()  { +    prefetched_msg.close ();  }  void zmq::xreq_t::xattach_pipe (pipe_t *pipe_) @@ -56,6 +60,14 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_)  int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)  { +    //  If there is a prefetched message, return it. +    if (prefetched) { +        int rc = msg_->move (prefetched_msg); +        errno_assert (rc == 0); +        prefetched = false; +        return 0; +    } +      //  XREQ socket doesn't use identities. We can safely drop it and       while (true) {          int rc = fq.recv (msg_, flags_); @@ -69,7 +81,17 @@ int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)  bool zmq::xreq_t::xhas_in ()  { -    return fq.has_in (); +    //  We may already have a message pre-fetched. +    if (prefetched) +        return true; + +    //  Try to read the next message to the pre-fetch buffer. +    int rc = xrecv (&prefetched_msg, ZMQ_DONTWAIT); +    if (rc != 0 && errno == EAGAIN) +        return false; +    zmq_assert (rc == 0); +    prefetched = true; +    return true;  }  bool zmq::xreq_t::xhas_out () diff --git a/src/xreq.hpp b/src/xreq.hpp index 4c94cad..897c197 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -62,6 +62,12 @@ namespace zmq          fq_t fq;          lb_t lb; +        //  Have we prefetched a message. +        bool prefetched; + +        //  Holds the prefetched message. +        msg_t prefetched_msg; +          xreq_t (const xreq_t&);          const xreq_t &operator = (const xreq_t&);      }; | 
