summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-12-17 10:14:32 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-12-17 10:14:32 +0100
commit91fdedf25c4d76b0ec0aeb5d1d9f1c9a1a769447 (patch)
tree80cad8945174939473ccfc4409cb70eb53b2e459 /src/xrep.cpp
parentf9eb763293014f812dac5558be5c5f03bb896efb (diff)
Fix polling on XREP socket
When polling on XREP socket in incoming message part was prefetched, but not the identity of sender. The problem is fixed by this patch. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp45
1 files changed, 36 insertions, 9 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index f304b23..520fa24 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -29,7 +29,7 @@
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
- prefetched (false),
+ prefetched (0),
more_in (false),
current_out (NULL),
more_out (false),
@@ -180,12 +180,23 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
{
+ // if there is a prefetched identity, return it.
+ if (prefetched == 2)
+ {
+ int rc = msg_->init_size (prefetched_id.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
+ msg_->set_flags (msg_t::more);
+ prefetched = 1;
+ return 0;
+ }
+
// If there is a prefetched message, return it.
- if (prefetched) {
+ if (prefetched == 1) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
more_in = msg_->flags () & msg_t::more ? true : false;
- prefetched = false;
+ prefetched = 0;
return 0;
}
@@ -235,7 +246,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// have to the prefetched and return the ID of the peer instead.
int rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0);
- prefetched = true;
+ prefetched = 1;
rc = msg_->close ();
errno_assert (rc == 0);
@@ -259,16 +270,32 @@ int zmq::xrep_t::rollback (void)
bool zmq::xrep_t::xhas_in ()
{
+ // If we are in the middle of reading the messages, there are
+ // definitely more parts available.
+ if (more_in)
+ return true;
+
// We may already have a message pre-fetched.
- if (prefetched)
+ if (prefetched > 0)
return true;
- // Try to read the next message to the pre-fetch buffer.
- int rc = xrep_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT);
- if (rc != 0 && errno == EAGAIN)
+ // Try to read the next message to the pre-fetch buffer. If anything,
+ // it will be identity of the peer sending the message.
+ msg_t id;
+ id.init ();
+ int rc = xrep_t::xrecv (&id, ZMQ_DONTWAIT);
+ if (rc != 0 && errno == EAGAIN) {
+ id.close ();
return false;
+ }
zmq_assert (rc == 0);
- prefetched = true;
+
+ // We have first part of the message prefetched now. We will store the
+ // prefetched identity as well.
+ prefetched_id.assign ((unsigned char*) id.data (), id.size ());
+ id.close ();
+ prefetched = 2;
+
return true;
}