summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/xrep.cpp45
-rw-r--r--src/xrep.hpp8
2 files changed, 42 insertions, 11 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index f304b23..520fa24 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -29,7 +29,7 @@
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
- prefetched (false),
+ prefetched (0),
more_in (false),
current_out (NULL),
more_out (false),
@@ -180,12 +180,23 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
{
+ // if there is a prefetched identity, return it.
+ if (prefetched == 2)
+ {
+ int rc = msg_->init_size (prefetched_id.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
+ msg_->set_flags (msg_t::more);
+ prefetched = 1;
+ return 0;
+ }
+
// If there is a prefetched message, return it.
- if (prefetched) {
+ if (prefetched == 1) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
more_in = msg_->flags () & msg_t::more ? true : false;
- prefetched = false;
+ prefetched = 0;
return 0;
}
@@ -235,7 +246,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// have to the prefetched and return the ID of the peer instead.
int rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0);
- prefetched = true;
+ prefetched = 1;
rc = msg_->close ();
errno_assert (rc == 0);
@@ -259,16 +270,32 @@ int zmq::xrep_t::rollback (void)
bool zmq::xrep_t::xhas_in ()
{
+ // If we are in the middle of reading the messages, there are
+ // definitely more parts available.
+ if (more_in)
+ return true;
+
// We may already have a message pre-fetched.
- if (prefetched)
+ if (prefetched > 0)
return true;
- // Try to read the next message to the pre-fetch buffer.
- int rc = xrep_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT);
- if (rc != 0 && errno == EAGAIN)
+ // Try to read the next message to the pre-fetch buffer. If anything,
+ // it will be identity of the peer sending the message.
+ msg_t id;
+ id.init ();
+ int rc = xrep_t::xrecv (&id, ZMQ_DONTWAIT);
+ if (rc != 0 && errno == EAGAIN) {
+ id.close ();
return false;
+ }
zmq_assert (rc == 0);
- prefetched = true;
+
+ // We have first part of the message prefetched now. We will store the
+ // prefetched identity as well.
+ prefetched_id.assign ((unsigned char*) id.data (), id.size ());
+ id.close ();
+ prefetched = 2;
+
return true;
}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index df82d00..65bd564 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -67,8 +67,12 @@ namespace zmq
// Fair queueing object for inbound pipes.
fq_t fq;
- // Have we prefetched a message.
- bool prefetched;
+ // This value is either 0 (nothing is prefetched), 1 (only message body
+ // is prefetched) or 2 (both identity and message body are prefetched).
+ int prefetched;
+
+ // Holds the prefetched identity.
+ blob_t prefetched_id;
// Holds the prefetched message.
msg_t prefetched_msg;