From 84e0c7991a9b316ed571533abc628cc1175750a3 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 2 May 2010 20:59:07 +0200 Subject: queue device fixed --- src/xreq.cpp | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) (limited to 'src/xreq.cpp') diff --git a/src/xreq.cpp b/src/xreq.cpp index 66e5cc3..ab90f68 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -23,7 +23,8 @@ #include "err.hpp" zmq::xreq_t::xreq_t (class app_thread_t *parent_) : - socket_base_t (parent_) + socket_base_t (parent_), + dropping (false) { options.requires_in = true; options.requires_out = true; @@ -77,7 +78,25 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) { - return lb.send (msg_, flags_); + while (true) { + + // If we are ignoring the current message, just drop it and return. + if (dropping) { + if (!(msg_->flags & ZMQ_MSG_MORE)) + dropping = false; + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + int rc = lb.send (msg_, flags_); + if (rc != 0 && errno == EAGAIN) + dropping = true; + else + return rc; + } } int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) @@ -92,6 +111,8 @@ bool zmq::xreq_t::xhas_in () bool zmq::xreq_t::xhas_out () { - return lb.has_out (); + // Socket is always ready for writing. When the queue is full, message + // will be silently dropped. + return true; } -- cgit v1.2.3