From 91fdedf25c4d76b0ec0aeb5d1d9f1c9a1a769447 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 17 Dec 2011 10:14:32 +0100 Subject: Fix polling on XREP socket When polling on XREP socket in incoming message part was prefetched, but not the identity of sender. The problem is fixed by this patch. Signed-off-by: Martin Sustrik --- src/xrep.cpp | 45 ++++++++++++++++++++++++++++++++++++--------- src/xrep.hpp | 8 ++++++-- 2 files changed, 42 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/xrep.cpp b/src/xrep.cpp index f304b23..520fa24 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -29,7 +29,7 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), - prefetched (false), + prefetched (0), more_in (false), current_out (NULL), more_out (false), @@ -180,12 +180,23 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) { + // if there is a prefetched identity, return it. + if (prefetched == 2) + { + int rc = msg_->init_size (prefetched_id.size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ()); + msg_->set_flags (msg_t::more); + prefetched = 1; + return 0; + } + // If there is a prefetched message, return it. - if (prefetched) { + if (prefetched == 1) { int rc = msg_->move (prefetched_msg); errno_assert (rc == 0); more_in = msg_->flags () & msg_t::more ? true : false; - prefetched = false; + prefetched = 0; return 0; } @@ -235,7 +246,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // have to the prefetched and return the ID of the peer instead. int rc = prefetched_msg.move (*msg_); errno_assert (rc == 0); - prefetched = true; + prefetched = 1; rc = msg_->close (); errno_assert (rc == 0); @@ -259,16 +270,32 @@ int zmq::xrep_t::rollback (void) bool zmq::xrep_t::xhas_in () { + // If we are in the middle of reading the messages, there are + // definitely more parts available. + if (more_in) + return true; + // We may already have a message pre-fetched. - if (prefetched) + if (prefetched > 0) return true; - // Try to read the next message to the pre-fetch buffer. - int rc = xrep_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT); - if (rc != 0 && errno == EAGAIN) + // Try to read the next message to the pre-fetch buffer. If anything, + // it will be identity of the peer sending the message. + msg_t id; + id.init (); + int rc = xrep_t::xrecv (&id, ZMQ_DONTWAIT); + if (rc != 0 && errno == EAGAIN) { + id.close (); return false; + } zmq_assert (rc == 0); - prefetched = true; + + // We have first part of the message prefetched now. We will store the + // prefetched identity as well. + prefetched_id.assign ((unsigned char*) id.data (), id.size ()); + id.close (); + prefetched = 2; + return true; } diff --git a/src/xrep.hpp b/src/xrep.hpp index df82d00..65bd564 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -67,8 +67,12 @@ namespace zmq // Fair queueing object for inbound pipes. fq_t fq; - // Have we prefetched a message. - bool prefetched; + // This value is either 0 (nothing is prefetched), 1 (only message body + // is prefetched) or 2 (both identity and message body are prefetched). + int prefetched; + + // Holds the prefetched identity. + blob_t prefetched_id; // Holds the prefetched message. msg_t prefetched_msg; -- cgit v1.2.3