summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/surveyor.cpp36
-rw-r--r--src/surveyor.hpp6
2 files changed, 37 insertions, 5 deletions
diff --git a/src/surveyor.cpp b/src/surveyor.cpp
index bd19b8a..6a789eb 100644
--- a/src/surveyor.cpp
+++ b/src/surveyor.cpp
@@ -31,13 +31,17 @@ xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xsurveyor_t (parent_, tid_, sid_),
receiving_responses (false),
survey_id (generate_random ()),
- timeout (0)
+ timeout (0),
+ has_prefetched (false)
{
options.type = XS_SURVEYOR;
+
+ prefetched.init ();
}
xs::surveyor_t::~surveyor_t ()
{
+ prefetched.close ();
}
int xs::surveyor_t::xsend (msg_t *msg_, int flags_)
@@ -90,6 +94,13 @@ int xs::surveyor_t::xrecv (msg_t *msg_, int flags_)
return -1;
}
+ // If we have a message prefetched we can return it straight away.
+ if (has_prefetched) {
+ msg_->move (prefetched);
+ has_prefetched = false;
+ return 0;
+ }
+
// Get the first part of the response -- the survey ID.
rc = xsurveyor_t::xrecv (msg_, flags_);
if (rc != 0) {
@@ -128,9 +139,26 @@ int xs::surveyor_t::xrecv (msg_t *msg_, int flags_)
bool xs::surveyor_t::xhas_in ()
{
- // TODO: We need a prefetched_message here...
- xs_assert (false);
- return false;
+ // When no survey is underway, POLLIN is never signaled.
+ if (!receiving_responses)
+ return false;
+
+ // If there's already a message prefetches signal POLLIN.
+ if (has_prefetched)
+ return true;
+
+ // Try to prefetch a message.
+ int rc = xrecv (&prefetched, XS_DONTWAIT);
+
+ // No message available.
+ if (rc != 0 && errno == EAGAIN)
+ return false;
+
+ errno_assert (rc == 0);
+
+ // We have a message prefetched. We can signal POLLIN now.
+ has_prefetched = true;
+ return true;
}
bool xs::surveyor_t::xhas_out ()
diff --git a/src/surveyor.hpp b/src/surveyor.hpp
index e79dab0..1661d9c 100644
--- a/src/surveyor.hpp
+++ b/src/surveyor.hpp
@@ -23,12 +23,12 @@
#include "xsurveyor.hpp"
#include "stdint.hpp"
+#include "msg.hpp"
namespace xs
{
class ctx_t;
- class msg_t;
class io_thread_t;
class socket_base_t;
@@ -57,6 +57,10 @@ namespace xs
// The time instant when the current survey expires.
uint64_t timeout;
+ // Inbound message prefetched during polling.
+ bool has_prefetched;
+ msg_t prefetched;
+
surveyor_t (const surveyor_t&);
const surveyor_t &operator = (const surveyor_t&);
};