diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-05-02 20:59:07 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-05-02 20:59:07 +0200 |
commit | 84e0c7991a9b316ed571533abc628cc1175750a3 (patch) | |
tree | 73f6290a8c6633784296cc27d5e9fcbca3b075e6 /src | |
parent | 4a6bac1deaedb3c111c7e28b2933ed98367cb193 (diff) |
queue device fixed
Diffstat (limited to 'src')
-rw-r--r-- | src/queue.cpp | 86 | ||||
-rw-r--r-- | src/xreq.cpp | 27 | ||||
-rw-r--r-- | src/xreq.hpp | 3 |
3 files changed, 70 insertions, 46 deletions
diff --git a/src/queue.cpp b/src/queue.cpp index 05cd125..470ea67 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <stddef.h> + #include "../include/zmq.h" #include "queue.hpp" @@ -26,15 +28,12 @@ int zmq::queue (class socket_base_t *insocket_, class socket_base_t *outsocket_) { - zmq_msg_t request_msg; - int rc = zmq_msg_init (&request_msg); - errno_assert (rc == 0); - bool has_request = false; + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + zmq_assert (rc == 0); - zmq_msg_t response_msg; - rc = zmq_msg_init (&response_msg); - errno_assert (rc == 0); - bool has_response = false; + int64_t more; + size_t moresz; zmq_pollitem_t items [2]; items [0].socket = insocket_; @@ -47,53 +46,54 @@ int zmq::queue (class socket_base_t *insocket_, items [1].revents = 0; while (true) { + + // Wait while there are either requests or replies to process. rc = zmq_poll (&items [0], 2, -1); errno_assert (rc > 0); // The algorithm below asumes ratio of request and replies processed - // under full load to be 1:1. While processing requests replies first - // is tempting it is suspectible to DoS attacks (overloading the system - // with unsolicited replies). + // under full load to be 1:1. Although processing requests replies + // first is tempting it is suspectible to DoS attacks (overloading + // the system with unsolicited replies). - // Receive a new request. + // Process a request. if (items [0].revents & ZMQ_POLLIN) { - zmq_assert (!has_request); - rc = insocket_->recv (&request_msg, ZMQ_NOBLOCK); - errno_assert (rc == 0); - items [0].events &= ~ZMQ_POLLIN; - items [1].events |= ZMQ_POLLOUT; - has_request = true; - } + while (true) { + + rc = insocket_->recv (&msg, 0); + errno_assert (rc == 0); + + moresz = sizeof (more); + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + errno_assert (rc == 0); + + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + errno_assert (rc == 0); - // Send the request further. - if (items [1].revents & ZMQ_POLLOUT) { - zmq_assert (has_request); - rc = outsocket_->send (&request_msg, ZMQ_NOBLOCK); - errno_assert (rc == 0); - items [0].events |= ZMQ_POLLIN; - items [1].events &= ~ZMQ_POLLOUT; - has_request = false; + if (!more) + break; + } } - // Get a new reply. + // Process a reply. if (items [1].revents & ZMQ_POLLIN) { - zmq_assert (!has_response); - rc = outsocket_->recv (&response_msg, ZMQ_NOBLOCK); - errno_assert (rc == 0); - items [0].events |= ZMQ_POLLOUT; - items [1].events &= ~ZMQ_POLLIN; - has_response = true; - } + while (true) { + + rc = outsocket_->recv (&msg, 0); + errno_assert (rc == 0); + + moresz = sizeof (more); + rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + errno_assert (rc == 0); - // Send the reply further. - if (items [0].revents & ZMQ_POLLOUT) {\ - zmq_assert (has_response); - rc = insocket_->send (&response_msg, ZMQ_NOBLOCK); - errno_assert (rc == 0); - items [0].events &= ~ZMQ_POLLOUT; - items [1].events |= ZMQ_POLLIN; - has_response = false; + rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + errno_assert (rc == 0); + + if (!more) + break; + } } + } return 0; diff --git a/src/xreq.cpp b/src/xreq.cpp index 66e5cc3..ab90f68 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -23,7 +23,8 @@ #include "err.hpp" zmq::xreq_t::xreq_t (class app_thread_t *parent_) : - socket_base_t (parent_) + socket_base_t (parent_), + dropping (false) { options.requires_in = true; options.requires_out = true; @@ -77,7 +78,25 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) { - return lb.send (msg_, flags_); + while (true) { + + // If we are ignoring the current message, just drop it and return. + if (dropping) { + if (!(msg_->flags & ZMQ_MSG_MORE)) + dropping = false; + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + int rc = lb.send (msg_, flags_); + if (rc != 0 && errno == EAGAIN) + dropping = true; + else + return rc; + } } int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) @@ -92,6 +111,8 @@ bool zmq::xreq_t::xhas_in () bool zmq::xreq_t::xhas_out () { - return lb.has_out (); + // Socket is always ready for writing. When the queue is full, message + // will be silently dropped. + return true; } diff --git a/src/xreq.hpp b/src/xreq.hpp index 8ee0bb9..25a97f1 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -55,6 +55,9 @@ namespace zmq fq_t fq; lb_t lb; + // If true, curently sent message is being dropped. + bool dropping; + xreq_t (const xreq_t&); void operator = (const xreq_t&); }; |