summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/xrep.cpp20
-rw-r--r--src/xrep.hpp6
2 files changed, 18 insertions, 8 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 4e8d18a..978ed0b 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -26,6 +26,7 @@
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_),
current_in (0),
+ prefetched (false),
more_in (false),
current_out (NULL),
more_out (false)
@@ -189,6 +190,13 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Deallocate old content of the message.
zmq_msg_close (msg_);
+ if (prefetched) {
+ zmq_msg_move (msg_, &prefetched_msg);
+ more_in = msg_->flags & ZMQ_MSG_MORE;
+ prefetched = false;
+ return 0;
+ }
+
// If we are in the middle of reading a message, just grab next part of it.
if (more_in) {
zmq_assert (inpipes [current_in].active);
@@ -207,21 +215,17 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
for (int count = inpipes.size (); count != 0; count--) {
// Try to fetch new message.
- bool fetched;
- if (!inpipes [current_in].active)
- fetched = false;
- else
- fetched = inpipes [current_in].reader->check_read ();
+ if (inpipes [current_in].active)
+ prefetched = inpipes [current_in].reader->read (&prefetched_msg);
// If we have a message, create a prefix and return it to the caller.
- if (fetched) {
+ if (prefetched) {
int rc = zmq_msg_init_size (msg_,
inpipes [current_in].identity.size ());
zmq_assert (rc == 0);
memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (),
zmq_msg_size (msg_));
msg_->flags = ZMQ_MSG_MORE;
- more_in = true;
return 0;
}
@@ -241,7 +245,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
bool zmq::xrep_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
- if (more_in)
+ if (prefetched || more_in)
return true;
// Note that messing with current doesn't break the fairness of fair
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 940d288..da1b3d8 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -67,6 +67,12 @@ namespace zmq
// The pipe we are currently reading from.
inpipes_t::size_type current_in;
+ // Have we prefetched a message.
+ bool prefetched;
+
+ // Holds the prefetched message.
+ zmq_msg_t prefetched_msg;
+
// If true, more incoming message parts are expected.
bool more_in;