diff options
Diffstat (limited to 'src/queue.cpp')
-rw-r--r-- | src/queue.cpp | 86 |
1 files changed, 43 insertions, 43 deletions
diff --git a/src/queue.cpp b/src/queue.cpp index 05cd125..470ea67 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <stddef.h> + #include "../include/zmq.h" #include "queue.hpp" @@ -26,15 +28,12 @@ int zmq::queue (class socket_base_t *insocket_, class socket_base_t *outsocket_) { - zmq_msg_t request_msg; - int rc = zmq_msg_init (&request_msg); - errno_assert (rc == 0); - bool has_request = false; + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + zmq_assert (rc == 0); - zmq_msg_t response_msg; - rc = zmq_msg_init (&response_msg); - errno_assert (rc == 0); - bool has_response = false; + int64_t more; + size_t moresz; zmq_pollitem_t items [2]; items [0].socket = insocket_; @@ -47,53 +46,54 @@ int zmq::queue (class socket_base_t *insocket_, items [1].revents = 0; while (true) { + + // Wait while there are either requests or replies to process. rc = zmq_poll (&items [0], 2, -1); errno_assert (rc > 0); // The algorithm below asumes ratio of request and replies processed - // under full load to be 1:1. While processing requests replies first - // is tempting it is suspectible to DoS attacks (overloading the system - // with unsolicited replies). + // under full load to be 1:1. Although processing requests replies + // first is tempting it is suspectible to DoS attacks (overloading + // the system with unsolicited replies). - // Receive a new request. + // Process a request. if (items [0].revents & ZMQ_POLLIN) { - zmq_assert (!has_request); - rc = insocket_->recv (&request_msg, ZMQ_NOBLOCK); - errno_assert (rc == 0); - items [0].events &= ~ZMQ_POLLIN; - items [1].events |= ZMQ_POLLOUT; - has_request = true; - } + while (true) { + + rc = insocket_->recv (&msg, 0); + errno_assert (rc == 0); + + moresz = sizeof (more); + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + errno_assert (rc == 0); + + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + errno_assert (rc == 0); - // Send the request further. - if (items [1].revents & ZMQ_POLLOUT) { - zmq_assert (has_request); - rc = outsocket_->send (&request_msg, ZMQ_NOBLOCK); - errno_assert (rc == 0); - items [0].events |= ZMQ_POLLIN; - items [1].events &= ~ZMQ_POLLOUT; - has_request = false; + if (!more) + break; + } } - // Get a new reply. + // Process a reply. if (items [1].revents & ZMQ_POLLIN) { - zmq_assert (!has_response); - rc = outsocket_->recv (&response_msg, ZMQ_NOBLOCK); - errno_assert (rc == 0); - items [0].events |= ZMQ_POLLOUT; - items [1].events &= ~ZMQ_POLLIN; - has_response = true; - } + while (true) { + + rc = outsocket_->recv (&msg, 0); + errno_assert (rc == 0); + + moresz = sizeof (more); + rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + errno_assert (rc == 0); - // Send the reply further. - if (items [0].revents & ZMQ_POLLOUT) {\ - zmq_assert (has_response); - rc = insocket_->send (&response_msg, ZMQ_NOBLOCK); - errno_assert (rc == 0); - items [0].events &= ~ZMQ_POLLOUT; - items [1].events |= ZMQ_POLLIN; - has_response = false; + rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + errno_assert (rc == 0); + + if (!more) + break; + } } + } return 0; |