diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/xrep.cpp | 20 | ||||
-rw-r--r-- | src/xrep.hpp | 6 |
2 files changed, 18 insertions, 8 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp index 4e8d18a..978ed0b 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -26,6 +26,7 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : socket_base_t (parent_), current_in (0), + prefetched (false), more_in (false), current_out (NULL), more_out (false) @@ -189,6 +190,13 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) // Deallocate old content of the message. zmq_msg_close (msg_); + if (prefetched) { + zmq_msg_move (msg_, &prefetched_msg); + more_in = msg_->flags & ZMQ_MSG_MORE; + prefetched = false; + return 0; + } + // If we are in the middle of reading a message, just grab next part of it. if (more_in) { zmq_assert (inpipes [current_in].active); @@ -207,21 +215,17 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) for (int count = inpipes.size (); count != 0; count--) { // Try to fetch new message. - bool fetched; - if (!inpipes [current_in].active) - fetched = false; - else - fetched = inpipes [current_in].reader->check_read (); + if (inpipes [current_in].active) + prefetched = inpipes [current_in].reader->read (&prefetched_msg); // If we have a message, create a prefix and return it to the caller. - if (fetched) { + if (prefetched) { int rc = zmq_msg_init_size (msg_, inpipes [current_in].identity.size ()); zmq_assert (rc == 0); memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (), zmq_msg_size (msg_)); msg_->flags = ZMQ_MSG_MORE; - more_in = true; return 0; } @@ -241,7 +245,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) bool zmq::xrep_t::xhas_in () { // There are subsequent parts of the partly-read message available. - if (more_in) + if (prefetched || more_in) return true; // Note that messing with current doesn't break the fairness of fair diff --git a/src/xrep.hpp b/src/xrep.hpp index 940d288..da1b3d8 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -67,6 +67,12 @@ namespace zmq // The pipe we are currently reading from. inpipes_t::size_type current_in; + // Have we prefetched a message. + bool prefetched; + + // Holds the prefetched message. + zmq_msg_t prefetched_msg; + // If true, more incoming message parts are expected. bool more_in; |