diff options
author | Martin Lucina <mato@kotelna.sk> | 2010-07-30 16:49:06 +0200 |
---|---|---|
committer | Martin Lucina <mato@kotelna.sk> | 2010-07-30 16:49:06 +0200 |
commit | 544b36da68729daffefa8f40d2efed5945851a01 (patch) | |
tree | 00af12f26dd5c1986f625726c8938b80c83af2af | |
parent | 66470b2c55d74fb137211f4264bbfc5e3f454534 (diff) |
XREQ: Correct behaviour on hitting ZMQ_HWM
This reverts part of commit 84e0c7991a9b316ed571533abc628cc1175750a3 to get
correct ZMQ_HWM semantics with XREQ sockets:
When sending a message to an XREQ socket, the underlying pipe is selected in
a round-robin fashion. If an underlying pipe is full it is skipped. If there
are no underlying pipes, or all underlying pipes are full then zmq_send()
shall block or return EAGAIN, depending on whether or not the call is blocking.
Messages are never dropped.
-rw-r--r-- | src/xreq.cpp | 27 | ||||
-rw-r--r-- | src/xreq.hpp | 3 |
2 files changed, 3 insertions, 27 deletions
diff --git a/src/xreq.cpp b/src/xreq.cpp index ab90f68..66e5cc3 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -23,8 +23,7 @@ #include "err.hpp" zmq::xreq_t::xreq_t (class app_thread_t *parent_) : - socket_base_t (parent_), - dropping (false) + socket_base_t (parent_) { options.requires_in = true; options.requires_out = true; @@ -78,25 +77,7 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) { - while (true) { - - // If we are ignoring the current message, just drop it and return. - if (dropping) { - if (!(msg_->flags & ZMQ_MSG_MORE)) - dropping = false; - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return 0; - } - - int rc = lb.send (msg_, flags_); - if (rc != 0 && errno == EAGAIN) - dropping = true; - else - return rc; - } + return lb.send (msg_, flags_); } int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) @@ -111,8 +92,6 @@ bool zmq::xreq_t::xhas_in () bool zmq::xreq_t::xhas_out () { - // Socket is always ready for writing. When the queue is full, message - // will be silently dropped. - return true; + return lb.has_out (); } diff --git a/src/xreq.hpp b/src/xreq.hpp index 25a97f1..8ee0bb9 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -55,9 +55,6 @@ namespace zmq fq_t fq; lb_t lb; - // If true, curently sent message is being dropped. - bool dropping; - xreq_t (const xreq_t&); void operator = (const xreq_t&); }; |