From c2f3b3b4458187085e148850068f9719c2567614 Mon Sep 17 00:00:00 2001 From: Jon Dyte Date: Fri, 27 Aug 2010 06:59:55 +0200 Subject: forwarder and streamer devices handle multi-part messages correctly --- src/forwarder.cpp | 17 ++++++++++++++--- src/queue.cpp | 15 ++++++++------- src/streamer.cpp | 17 ++++++++++++++--- 3 files changed, 36 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/forwarder.cpp b/src/forwarder.cpp index 503868b..d1f324e 100644 --- a/src/forwarder.cpp +++ b/src/forwarder.cpp @@ -21,6 +21,7 @@ #include "forwarder.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) @@ -29,16 +30,26 @@ int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) int rc = zmq_msg_init (&msg); errno_assert (rc == 0); + int64_t more; + size_t more_sz = sizeof (more); + while (true) { rc = insocket_->recv (&msg, 0); - if (rc < 0) { + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); } - rc = outsocket_->send (&msg, 0); - if (rc < 0) { + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); diff --git a/src/queue.cpp b/src/queue.cpp index 311a8c1..36fab07 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -23,6 +23,7 @@ #include "queue.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::queue (class socket_base_t *insocket_, @@ -49,7 +50,7 @@ int zmq::queue (class socket_base_t *insocket_, // Wait while there are either requests or replies to process. rc = zmq_poll (&items [0], 2, -1); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); @@ -65,7 +66,7 @@ int zmq::queue (class socket_base_t *insocket_, while (true) { rc = insocket_->recv (&msg, 0); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); @@ -73,14 +74,14 @@ int zmq::queue (class socket_base_t *insocket_, moresz = sizeof (more); rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); } rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); @@ -96,7 +97,7 @@ int zmq::queue (class socket_base_t *insocket_, while (true) { rc = outsocket_->recv (&msg, 0); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); @@ -104,14 +105,14 @@ int zmq::queue (class socket_base_t *insocket_, moresz = sizeof (more); rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); } rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); diff --git a/src/streamer.cpp b/src/streamer.cpp index 9799007..7c03365 100644 --- a/src/streamer.cpp +++ b/src/streamer.cpp @@ -21,6 +21,7 @@ #include "streamer.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_) @@ -29,16 +30,26 @@ int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_) int rc = zmq_msg_init (&msg); errno_assert (rc == 0); + int64_t more; + size_t more_sz = sizeof (more); + while (true) { rc = insocket_->recv (&msg, 0); - if (rc < 0) { + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); } - rc = outsocket_->send (&msg, 0); - if (rc < 0) { + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); -- cgit v1.2.3