diff options
-rw-r--r-- | include/xs.h | 1 | ||||
-rw-r--r-- | src/options.cpp | 26 | ||||
-rw-r--r-- | src/options.hpp | 3 | ||||
-rw-r--r-- | src/socket_base.cpp | 18 | ||||
-rw-r--r-- | src/socket_base.hpp | 4 | ||||
-rw-r--r-- | src/surveyor.cpp | 21 | ||||
-rw-r--r-- | src/surveyor.hpp | 8 | ||||
-rw-r--r-- | tests/survey.cpp | 12 |
8 files changed, 88 insertions, 5 deletions
diff --git a/include/xs.h b/include/xs.h index 34d6c60..16e479d 100644 --- a/include/xs.h +++ b/include/xs.h @@ -207,6 +207,7 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval, #define XS_IPV4ONLY 31 #define XS_KEEPALIVE 32 #define XS_PROTOCOL 33 +#define XS_SURVEY_TIMEOUT 35 /* Message options */ #define XS_MORE 1 diff --git a/src/options.cpp b/src/options.cpp index 2a72add..f7bbdc4 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -50,6 +50,7 @@ xs::options_t::options_t () : keepalive (0), protocol (0), filter (XS_FILTER_PREFIX), + survey_timeout (-1), delay_on_close (true), delay_on_disconnect (true), send_identity (false), @@ -258,6 +259,18 @@ int xs::options_t::setsockopt (int option_, const void *optval_, filter = *((int*) optval_); return 0; + case XS_SURVEY_TIMEOUT: + if (type != XS_SURVEYOR) { + errno = ENOTSUP; + return -1; + } + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + survey_timeout = *((int*) optval_); + return 0; + } errno = EINVAL; @@ -457,6 +470,19 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case XS_SURVEY_TIMEOUT: + if (type != XS_SURVEYOR) { + errno = ENOTSUP; + return -1; + } + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = survey_timeout; + *optvallen_ = sizeof (int); + return 0; + } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index fac6084..805f793 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -98,6 +98,9 @@ namespace xs // Filter ID to be used with subscriptions and unsubscriptions. int filter; + // Timeout for the survey in milliseconds. -1 means infinite. + int survey_timeout; + // If true, session reads all the pending messages from the pipe and // sends them to the network when socket is closed. bool delay_on_close; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index df182f1..1f45fd6 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -552,13 +552,13 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // In case of non-blocking send we'll simply propagate // the error - including EAGAIN - up the stack. - if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) + int timeout = sndtimeo (); + if (flags_ & XS_DONTWAIT || timeout == 0) return -1; // Compute the time when the timeout should occur. // If the timeout is infite, don't care. clock_t clock ; - int timeout = options.sndtimeo; uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout); // Oops, we couldn't send the message. Wait for the next @@ -627,7 +627,8 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // For non-blocking recv, commands are processed in case there's an // activate_reader command already waiting int a command pipe. // If it's not, return EAGAIN. - if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) { + int timeout = rcvtimeo (); + if (flags_ & XS_DONTWAIT || timeout == 0) { if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; @@ -643,7 +644,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // Compute the time when the timeout should occur. // If the timeout is infite, don't care. clock_t clock ; - int timeout = options.rcvtimeo; uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout); // In blocking scenario, commands are processed over and over again until @@ -926,3 +926,13 @@ void xs::socket_base_t::extract_flags (msg_t *msg_) rcvmore = msg_->flags () & msg_t::more ? true : false; } +int xs::socket_base_t::rcvtimeo () +{ + return options.rcvtimeo; +} + +int xs::socket_base_t::sndtimeo () +{ + return options.sndtimeo; +} + diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 740fdde..9225ce7 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -119,6 +119,10 @@ namespace xs virtual bool xhas_in (); virtual int xrecv (xs::msg_t *msg_, int flags_); + // Allow derived classes to modify timeouts. + virtual int rcvtimeo (); + virtual int sndtimeo (); + // i_pipe_events will be forwarded to these functions. virtual void xread_activated (pipe_t *pipe_); virtual void xwrite_activated (pipe_t *pipe_); diff --git a/src/surveyor.cpp b/src/surveyor.cpp index c5216bd..b76128b 100644 --- a/src/surveyor.cpp +++ b/src/surveyor.cpp @@ -18,6 +18,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <algorithm> + #include "surveyor.hpp" #include "err.hpp" #include "msg.hpp" @@ -28,7 +30,8 @@ xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) : xsurveyor_t (parent_, tid_, sid_), receiving_responses (false), - survey_id (generate_random ()) + survey_id (generate_random ()), + timeout (0) { options.type = XS_SURVEYOR; } @@ -68,6 +71,12 @@ int xs::surveyor_t::xsend (msg_t *msg_, int flags_) // Start waiting for responses from the peers. receiving_responses = true; + // Set up the timeout for the survey (-1 means infinite). + if (!options.survey_timeout) + timeout = -1; + else + timeout = clock.now_ms () + options.survey_timeout; + return 0; } @@ -121,6 +130,16 @@ bool xs::surveyor_t::xhas_out () return xsurveyor_t::xhas_out (); } +int xs::surveyor_t::rcvtimeo () +{ + int t = timeout - clock.now_ms (); + if (t < 0) + return options.rcvtimeo; + if (options.rcvtimeo < 0) + return t; + return std::min (t, options.rcvtimeo); +} + xs::surveyor_session_t::surveyor_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : diff --git a/src/surveyor.hpp b/src/surveyor.hpp index 83fdbb0..a77af49 100644 --- a/src/surveyor.hpp +++ b/src/surveyor.hpp @@ -23,6 +23,7 @@ #include "xsurveyor.hpp" #include "stdint.hpp" +#include "clock.hpp" namespace xs { @@ -44,6 +45,7 @@ namespace xs int xrecv (xs::msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); + int rcvtimeo (); private: @@ -53,6 +55,12 @@ namespace xs // The ID of the ongoing survey. uint32_t survey_id; + // The time instant when the current survey expires. + uint64_t timeout; + + // Provides a way to measure time quickly. + clock_t clock; + surveyor_t (const surveyor_t&); const surveyor_t &operator = (const surveyor_t&); }; diff --git a/tests/survey.cpp b/tests/survey.cpp index 41f39aa..c6b3131 100644 --- a/tests/survey.cpp +++ b/tests/survey.cpp @@ -118,6 +118,18 @@ int XS_TEST_MAIN () rc = xs_recv (surveyor, buf, sizeof (buf), 0); assert (rc == 4); + // Now let's test whether survey timeout works as expected. + int timeout = 100; + rc = xs_setsockopt (surveyor, XS_SURVEY_TIMEOUT, &timeout, sizeof (int)); + assert (rc == 0); + rc = xs_send (surveyor, "ABC", 3, 0); + assert (rc == 3); + void *watch = xs_stopwatch_start (); + rc = xs_recv (surveyor, buf, sizeof (buf), 0); + assert (rc == - 1 && errno == EAGAIN); + unsigned long elapsed = xs_stopwatch_stop (watch) / 1000; + time_assert (elapsed, (unsigned long) timeout); + rc = xs_close (respondent2); assert (rc == 0); rc = xs_close (respondent1); |