diff options
| -rw-r--r-- | include/xs.h | 1 | ||||
| -rw-r--r-- | src/options.cpp | 26 | ||||
| -rw-r--r-- | src/options.hpp | 3 | ||||
| -rw-r--r-- | src/socket_base.cpp | 18 | ||||
| -rw-r--r-- | src/socket_base.hpp | 4 | ||||
| -rw-r--r-- | src/surveyor.cpp | 21 | ||||
| -rw-r--r-- | src/surveyor.hpp | 8 | ||||
| -rw-r--r-- | tests/survey.cpp | 12 | 
8 files changed, 88 insertions, 5 deletions
| diff --git a/include/xs.h b/include/xs.h index 34d6c60..16e479d 100644 --- a/include/xs.h +++ b/include/xs.h @@ -207,6 +207,7 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,  #define XS_IPV4ONLY 31  #define XS_KEEPALIVE 32  #define XS_PROTOCOL 33 +#define XS_SURVEY_TIMEOUT 35  /*  Message options                                                           */  #define XS_MORE 1 diff --git a/src/options.cpp b/src/options.cpp index 2a72add..f7bbdc4 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -50,6 +50,7 @@ xs::options_t::options_t () :      keepalive (0),      protocol (0),      filter (XS_FILTER_PREFIX), +    survey_timeout (-1),      delay_on_close (true),      delay_on_disconnect (true),      send_identity (false), @@ -258,6 +259,18 @@ int xs::options_t::setsockopt (int option_, const void *optval_,          filter = *((int*) optval_);          return 0; +    case XS_SURVEY_TIMEOUT: +        if (type != XS_SURVEYOR) { +            errno = ENOTSUP; +            return -1; +        } +        if (optvallen_ != sizeof (int)) { +            errno = EINVAL; +            return -1; +        } +        survey_timeout = *((int*) optval_); +        return 0; +      }      errno = EINVAL; @@ -457,6 +470,19 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)          *optvallen_ = sizeof (int);          return 0; +    case XS_SURVEY_TIMEOUT: +        if (type != XS_SURVEYOR) { +            errno = ENOTSUP; +            return -1; +        } +        if (*optvallen_ < sizeof (int)) { +            errno = EINVAL; +            return -1; +        } +        *((int*) optval_) = survey_timeout; +        *optvallen_ = sizeof (int); +        return 0; +      }      errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index fac6084..805f793 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -98,6 +98,9 @@ namespace xs          //  Filter ID to be used with subscriptions and unsubscriptions.          int filter; +        //  Timeout for the survey in milliseconds. -1 means infinite. +        int survey_timeout; +          //  If true, session reads all the pending messages from the pipe and          //  sends them to the network when socket is closed.          bool delay_on_close; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index df182f1..1f45fd6 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -552,13 +552,13 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)      //  In case of non-blocking send we'll simply propagate      //  the error - including EAGAIN - up the stack. -    if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) +    int timeout = sndtimeo (); +    if (flags_ & XS_DONTWAIT || timeout == 0)          return -1;      //  Compute the time when the timeout should occur.      //  If the timeout is infite, don't care.       clock_t clock ; -    int timeout = options.sndtimeo;      uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);      //  Oops, we couldn't send the message. Wait for the next @@ -627,7 +627,8 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)      //  For non-blocking recv, commands are processed in case there's an      //  activate_reader command already waiting int a command pipe.      //  If it's not, return EAGAIN. -    if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) { +    int timeout = rcvtimeo (); +    if (flags_ & XS_DONTWAIT || timeout == 0) {          if (unlikely (process_commands (0, false) != 0))              return -1;          ticks = 0; @@ -643,7 +644,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)      //  Compute the time when the timeout should occur.      //  If the timeout is infite, don't care.       clock_t clock ; -    int timeout = options.rcvtimeo;      uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);      //  In blocking scenario, commands are processed over and over again until @@ -926,3 +926,13 @@ void xs::socket_base_t::extract_flags (msg_t *msg_)      rcvmore = msg_->flags () & msg_t::more ? true : false;  } +int xs::socket_base_t::rcvtimeo () +{ +    return options.rcvtimeo; +} + +int xs::socket_base_t::sndtimeo () +{ +    return options.sndtimeo; +} + diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 740fdde..9225ce7 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -119,6 +119,10 @@ namespace xs          virtual bool xhas_in ();          virtual int xrecv (xs::msg_t *msg_, int flags_); +        //  Allow derived classes to modify timeouts. +        virtual int rcvtimeo (); +        virtual int sndtimeo (); +          //  i_pipe_events will be forwarded to these functions.          virtual void xread_activated (pipe_t *pipe_);          virtual void xwrite_activated (pipe_t *pipe_); diff --git a/src/surveyor.cpp b/src/surveyor.cpp index c5216bd..b76128b 100644 --- a/src/surveyor.cpp +++ b/src/surveyor.cpp @@ -18,6 +18,8 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include <algorithm> +  #include "surveyor.hpp"  #include "err.hpp"  #include "msg.hpp" @@ -28,7 +30,8 @@  xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      xsurveyor_t (parent_, tid_, sid_),      receiving_responses (false), -    survey_id (generate_random ()) +    survey_id (generate_random ()), +    timeout (0)  {      options.type = XS_SURVEYOR;  } @@ -68,6 +71,12 @@ int xs::surveyor_t::xsend (msg_t *msg_, int flags_)      //  Start waiting for responses from the peers.      receiving_responses = true; +    //  Set up the timeout for the survey (-1 means infinite). +    if (!options.survey_timeout) +        timeout = -1; +    else +        timeout = clock.now_ms () + options.survey_timeout; +      return 0;  } @@ -121,6 +130,16 @@ bool xs::surveyor_t::xhas_out ()      return xsurveyor_t::xhas_out ();  } +int xs::surveyor_t::rcvtimeo () +{ +    int t = timeout - clock.now_ms (); +    if (t < 0) +        return options.rcvtimeo; +    if (options.rcvtimeo < 0) +        return t; +    return std::min (t, options.rcvtimeo);  +} +  xs::surveyor_session_t::surveyor_session_t (io_thread_t *io_thread_,        bool connect_, socket_base_t *socket_, const options_t &options_,        const char *protocol_, const char *address_) : diff --git a/src/surveyor.hpp b/src/surveyor.hpp index 83fdbb0..a77af49 100644 --- a/src/surveyor.hpp +++ b/src/surveyor.hpp @@ -23,6 +23,7 @@  #include "xsurveyor.hpp"  #include "stdint.hpp" +#include "clock.hpp"  namespace xs  { @@ -44,6 +45,7 @@ namespace xs          int xrecv (xs::msg_t *msg_, int flags_);          bool xhas_in ();          bool xhas_out (); +        int rcvtimeo ();      private: @@ -53,6 +55,12 @@ namespace xs          //  The ID of the ongoing survey.          uint32_t survey_id; +        //  The time instant when the current survey expires. +        uint64_t timeout; + +        //  Provides a way to measure time quickly. +        clock_t clock; +          surveyor_t (const surveyor_t&);          const surveyor_t &operator = (const surveyor_t&);      }; diff --git a/tests/survey.cpp b/tests/survey.cpp index 41f39aa..c6b3131 100644 --- a/tests/survey.cpp +++ b/tests/survey.cpp @@ -118,6 +118,18 @@ int XS_TEST_MAIN ()      rc = xs_recv (surveyor, buf, sizeof (buf), 0);      assert (rc == 4); +    //  Now let's test whether survey timeout works as expected. +    int timeout = 100; +    rc = xs_setsockopt (surveyor, XS_SURVEY_TIMEOUT, &timeout, sizeof (int)); +    assert (rc == 0); +    rc = xs_send (surveyor, "ABC", 3, 0); +    assert (rc == 3); +    void *watch = xs_stopwatch_start (); +    rc = xs_recv (surveyor, buf, sizeof (buf), 0); +    assert (rc == - 1 && errno == EAGAIN); +    unsigned long elapsed = xs_stopwatch_stop (watch) / 1000; +    time_assert (elapsed, (unsigned long) timeout); +      rc = xs_close (respondent2);      assert (rc == 0);      rc = xs_close (respondent1); | 
