summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-16 07:47:00 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-17 07:34:37 +0200
commitdc4ed5a7b59f1d5b4e4f7fb4b6e29ecaf5e6cc5c (patch)
tree29987586f7012a31c187de491edc99aea174473b
parent443d06f894751062da6d69238ce09f6fbfc27577 (diff)
XS_SURVEY_TIMEOUT socket option added.
It can be used to timeout the survey. Value is in milliseconds and -1 means infinite (default). Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--include/xs.h1
-rw-r--r--src/options.cpp26
-rw-r--r--src/options.hpp3
-rw-r--r--src/socket_base.cpp18
-rw-r--r--src/socket_base.hpp4
-rw-r--r--src/surveyor.cpp21
-rw-r--r--src/surveyor.hpp8
-rw-r--r--tests/survey.cpp12
8 files changed, 88 insertions, 5 deletions
diff --git a/include/xs.h b/include/xs.h
index 34d6c60..16e479d 100644
--- a/include/xs.h
+++ b/include/xs.h
@@ -207,6 +207,7 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,
#define XS_IPV4ONLY 31
#define XS_KEEPALIVE 32
#define XS_PROTOCOL 33
+#define XS_SURVEY_TIMEOUT 35
/* Message options */
#define XS_MORE 1
diff --git a/src/options.cpp b/src/options.cpp
index 2a72add..f7bbdc4 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -50,6 +50,7 @@ xs::options_t::options_t () :
keepalive (0),
protocol (0),
filter (XS_FILTER_PREFIX),
+ survey_timeout (-1),
delay_on_close (true),
delay_on_disconnect (true),
send_identity (false),
@@ -258,6 +259,18 @@ int xs::options_t::setsockopt (int option_, const void *optval_,
filter = *((int*) optval_);
return 0;
+ case XS_SURVEY_TIMEOUT:
+ if (type != XS_SURVEYOR) {
+ errno = ENOTSUP;
+ return -1;
+ }
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ survey_timeout = *((int*) optval_);
+ return 0;
+
}
errno = EINVAL;
@@ -457,6 +470,19 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
+ case XS_SURVEY_TIMEOUT:
+ if (type != XS_SURVEYOR) {
+ errno = ENOTSUP;
+ return -1;
+ }
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = survey_timeout;
+ *optvallen_ = sizeof (int);
+ return 0;
+
}
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index fac6084..805f793 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -98,6 +98,9 @@ namespace xs
// Filter ID to be used with subscriptions and unsubscriptions.
int filter;
+ // Timeout for the survey in milliseconds. -1 means infinite.
+ int survey_timeout;
+
// If true, session reads all the pending messages from the pipe and
// sends them to the network when socket is closed.
bool delay_on_close;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index df182f1..1f45fd6 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -552,13 +552,13 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - up the stack.
- if (flags_ & XS_DONTWAIT || options.sndtimeo == 0)
+ int timeout = sndtimeo ();
+ if (flags_ & XS_DONTWAIT || timeout == 0)
return -1;
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
clock_t clock ;
- int timeout = options.sndtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
// Oops, we couldn't send the message. Wait for the next
@@ -627,7 +627,8 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// For non-blocking recv, commands are processed in case there's an
// activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN.
- if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) {
+ int timeout = rcvtimeo ();
+ if (flags_ & XS_DONTWAIT || timeout == 0) {
if (unlikely (process_commands (0, false) != 0))
return -1;
ticks = 0;
@@ -643,7 +644,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
clock_t clock ;
- int timeout = options.rcvtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
// In blocking scenario, commands are processed over and over again until
@@ -926,3 +926,13 @@ void xs::socket_base_t::extract_flags (msg_t *msg_)
rcvmore = msg_->flags () & msg_t::more ? true : false;
}
+int xs::socket_base_t::rcvtimeo ()
+{
+ return options.rcvtimeo;
+}
+
+int xs::socket_base_t::sndtimeo ()
+{
+ return options.sndtimeo;
+}
+
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 740fdde..9225ce7 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -119,6 +119,10 @@ namespace xs
virtual bool xhas_in ();
virtual int xrecv (xs::msg_t *msg_, int flags_);
+ // Allow derived classes to modify timeouts.
+ virtual int rcvtimeo ();
+ virtual int sndtimeo ();
+
// i_pipe_events will be forwarded to these functions.
virtual void xread_activated (pipe_t *pipe_);
virtual void xwrite_activated (pipe_t *pipe_);
diff --git a/src/surveyor.cpp b/src/surveyor.cpp
index c5216bd..b76128b 100644
--- a/src/surveyor.cpp
+++ b/src/surveyor.cpp
@@ -18,6 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <algorithm>
+
#include "surveyor.hpp"
#include "err.hpp"
#include "msg.hpp"
@@ -28,7 +30,8 @@
xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xsurveyor_t (parent_, tid_, sid_),
receiving_responses (false),
- survey_id (generate_random ())
+ survey_id (generate_random ()),
+ timeout (0)
{
options.type = XS_SURVEYOR;
}
@@ -68,6 +71,12 @@ int xs::surveyor_t::xsend (msg_t *msg_, int flags_)
// Start waiting for responses from the peers.
receiving_responses = true;
+ // Set up the timeout for the survey (-1 means infinite).
+ if (!options.survey_timeout)
+ timeout = -1;
+ else
+ timeout = clock.now_ms () + options.survey_timeout;
+
return 0;
}
@@ -121,6 +130,16 @@ bool xs::surveyor_t::xhas_out ()
return xsurveyor_t::xhas_out ();
}
+int xs::surveyor_t::rcvtimeo ()
+{
+ int t = timeout - clock.now_ms ();
+ if (t < 0)
+ return options.rcvtimeo;
+ if (options.rcvtimeo < 0)
+ return t;
+ return std::min (t, options.rcvtimeo);
+}
+
xs::surveyor_session_t::surveyor_session_t (io_thread_t *io_thread_,
bool connect_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
diff --git a/src/surveyor.hpp b/src/surveyor.hpp
index 83fdbb0..a77af49 100644
--- a/src/surveyor.hpp
+++ b/src/surveyor.hpp
@@ -23,6 +23,7 @@
#include "xsurveyor.hpp"
#include "stdint.hpp"
+#include "clock.hpp"
namespace xs
{
@@ -44,6 +45,7 @@ namespace xs
int xrecv (xs::msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
+ int rcvtimeo ();
private:
@@ -53,6 +55,12 @@ namespace xs
// The ID of the ongoing survey.
uint32_t survey_id;
+ // The time instant when the current survey expires.
+ uint64_t timeout;
+
+ // Provides a way to measure time quickly.
+ clock_t clock;
+
surveyor_t (const surveyor_t&);
const surveyor_t &operator = (const surveyor_t&);
};
diff --git a/tests/survey.cpp b/tests/survey.cpp
index 41f39aa..c6b3131 100644
--- a/tests/survey.cpp
+++ b/tests/survey.cpp
@@ -118,6 +118,18 @@ int XS_TEST_MAIN ()
rc = xs_recv (surveyor, buf, sizeof (buf), 0);
assert (rc == 4);
+ // Now let's test whether survey timeout works as expected.
+ int timeout = 100;
+ rc = xs_setsockopt (surveyor, XS_SURVEY_TIMEOUT, &timeout, sizeof (int));
+ assert (rc == 0);
+ rc = xs_send (surveyor, "ABC", 3, 0);
+ assert (rc == 3);
+ void *watch = xs_stopwatch_start ();
+ rc = xs_recv (surveyor, buf, sizeof (buf), 0);
+ assert (rc == - 1 && errno == EAGAIN);
+ unsigned long elapsed = xs_stopwatch_stop (watch) / 1000;
+ time_assert (elapsed, (unsigned long) timeout);
+
rc = xs_close (respondent2);
assert (rc == 0);
rc = xs_close (respondent1);