summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-12-17 10:14:32 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-12-17 10:14:32 +0100
commit91fdedf25c4d76b0ec0aeb5d1d9f1c9a1a769447 (patch)
tree80cad8945174939473ccfc4409cb70eb53b2e459
parentf9eb763293014f812dac5558be5c5f03bb896efb (diff)
Fix polling on XREP socket
When polling on XREP socket in incoming message part was prefetched, but not the identity of sender. The problem is fixed by this patch. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/xrep.cpp45
-rw-r--r--src/xrep.hpp8
2 files changed, 42 insertions, 11 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index f304b23..520fa24 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -29,7 +29,7 @@
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
- prefetched (false),
+ prefetched (0),
more_in (false),
current_out (NULL),
more_out (false),
@@ -180,12 +180,23 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
{
+ // if there is a prefetched identity, return it.
+ if (prefetched == 2)
+ {
+ int rc = msg_->init_size (prefetched_id.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
+ msg_->set_flags (msg_t::more);
+ prefetched = 1;
+ return 0;
+ }
+
// If there is a prefetched message, return it.
- if (prefetched) {
+ if (prefetched == 1) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
more_in = msg_->flags () & msg_t::more ? true : false;
- prefetched = false;
+ prefetched = 0;
return 0;
}
@@ -235,7 +246,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// have to the prefetched and return the ID of the peer instead.
int rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0);
- prefetched = true;
+ prefetched = 1;
rc = msg_->close ();
errno_assert (rc == 0);
@@ -259,16 +270,32 @@ int zmq::xrep_t::rollback (void)
bool zmq::xrep_t::xhas_in ()
{
+ // If we are in the middle of reading the messages, there are
+ // definitely more parts available.
+ if (more_in)
+ return true;
+
// We may already have a message pre-fetched.
- if (prefetched)
+ if (prefetched > 0)
return true;
- // Try to read the next message to the pre-fetch buffer.
- int rc = xrep_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT);
- if (rc != 0 && errno == EAGAIN)
+ // Try to read the next message to the pre-fetch buffer. If anything,
+ // it will be identity of the peer sending the message.
+ msg_t id;
+ id.init ();
+ int rc = xrep_t::xrecv (&id, ZMQ_DONTWAIT);
+ if (rc != 0 && errno == EAGAIN) {
+ id.close ();
return false;
+ }
zmq_assert (rc == 0);
- prefetched = true;
+
+ // We have first part of the message prefetched now. We will store the
+ // prefetched identity as well.
+ prefetched_id.assign ((unsigned char*) id.data (), id.size ());
+ id.close ();
+ prefetched = 2;
+
return true;
}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index df82d00..65bd564 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -67,8 +67,12 @@ namespace zmq
// Fair queueing object for inbound pipes.
fq_t fq;
- // Have we prefetched a message.
- bool prefetched;
+ // This value is either 0 (nothing is prefetched), 1 (only message body
+ // is prefetched) or 2 (both identity and message body are prefetched).
+ int prefetched;
+
+ // Holds the prefetched identity.
+ blob_t prefetched_id;
// Holds the prefetched message.
msg_t prefetched_msg;