diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-05-02 20:59:07 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-05-02 20:59:07 +0200 |
commit | 84e0c7991a9b316ed571533abc628cc1175750a3 (patch) | |
tree | 73f6290a8c6633784296cc27d5e9fcbca3b075e6 /src/xreq.cpp | |
parent | 4a6bac1deaedb3c111c7e28b2933ed98367cb193 (diff) |
queue device fixed
Diffstat (limited to 'src/xreq.cpp')
-rw-r--r-- | src/xreq.cpp | 27 |
1 files changed, 24 insertions, 3 deletions
diff --git a/src/xreq.cpp b/src/xreq.cpp index 66e5cc3..ab90f68 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -23,7 +23,8 @@ #include "err.hpp" zmq::xreq_t::xreq_t (class app_thread_t *parent_) : - socket_base_t (parent_) + socket_base_t (parent_), + dropping (false) { options.requires_in = true; options.requires_out = true; @@ -77,7 +78,25 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) { - return lb.send (msg_, flags_); + while (true) { + + // If we are ignoring the current message, just drop it and return. + if (dropping) { + if (!(msg_->flags & ZMQ_MSG_MORE)) + dropping = false; + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + int rc = lb.send (msg_, flags_); + if (rc != 0 && errno == EAGAIN) + dropping = true; + else + return rc; + } } int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) @@ -92,6 +111,8 @@ bool zmq::xreq_t::xhas_in () bool zmq::xreq_t::xhas_out () { - return lb.has_out (); + // Socket is always ready for writing. When the queue is full, message + // will be silently dropped. + return true; } |