From d7923f08cab62ef40027a92f596ff45428870838 Mon Sep 17 00:00:00 2001 From: Fabien Ninoles Date: Fri, 17 Jun 2011 12:22:02 +0200 Subject: Add sockopt ZMQ_RCVTIMEO/ZMQ_SNDTIMEO. - Add doc and tests - Add options and setup - Wait using poll/select Signed-off-by: Fabien Ninoles Signed-off-by: Martin Sustrik --- src/socket_base.cpp | 100 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 67 insertions(+), 33 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2b1d8af..dc6b5f5 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -288,7 +288,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - int rc = process_commands (false, false); + int rc = process_commands (0, false); if (rc != 0 && (errno == EINTR || errno == ETERM)) return -1; errno_assert (rc == 0); @@ -475,7 +475,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) } // Process pending commands, if any. - int rc = process_commands (false, true); + int rc = process_commands (0, true); if (unlikely (rc != 0)) return -1; @@ -487,20 +487,38 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) rc = xsend (msg_, flags_); if (rc == 0) return 0; + if (unlikely (errno != EAGAIN)) + return -1; // In case of non-blocking send we'll simply propagate - // the error - including EAGAIN - upwards. - if (flags_ & ZMQ_DONTWAIT) + // the error - including EAGAIN - up the stack. + if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) return -1; + // Compute the time when the timeout should occur. + // If the timeout is infite, don't care. + clock_t clock ; + int timeout = options.sndtimeo; + uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout); + // Oops, we couldn't send the message. Wait for the next // command, process it and try to send the message again. - while (rc != 0) { - if (errno != EAGAIN) - return -1; - if (unlikely (process_commands (true, false) != 0)) + // If timeout is reached in the meantime, return EAGAIN. + while (true) { + if (unlikely (process_commands (timeout, false) != 0)) return -1; rc = xsend (msg_, flags_); + if (rc == 0) + break; + if (unlikely (errno != EAGAIN)) + return -1; + if (timeout > 0) { + timeout = end - clock.now_ms (); + if (timeout <= 0) { + errno = EAGAIN; + return -1; + } + } } return 0; } @@ -521,7 +539,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // Get the message. int rc = xrecv (msg_, flags_); - int err = errno; + if (unlikely (rc != 0 && errno != EAGAIN)) + return -1; // Once every inbound_poll_rate messages check for signals and process // incoming commands. This happens only if we are not polling altogether @@ -532,7 +551,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // described above) from the one used by 'send'. This is because counting // ticks is more efficient than doing RDTSC all the time. if (++ticks == inbound_poll_rate) { - if (unlikely (process_commands (false, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; } @@ -545,17 +564,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) return 0; } - // If we don't have the message, restore the original cause of the problem. - errno = err; - // If the message cannot be fetched immediately, there are two scenarios. // For non-blocking recv, commands are processed in case there's an // activate_reader command already waiting int a command pipe. // If it's not, return EAGAIN. - if (flags_ & ZMQ_DONTWAIT) { - if (errno != EAGAIN) - return -1; - if (unlikely (process_commands (false, false) != 0)) + if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) { + if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; @@ -568,17 +582,33 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) return rc; } + // Compute the time when the timeout should occur. + // If the timeout is infite, don't care. + clock_t clock ; + int timeout = options.rcvtimeo; + uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout); + // In blocking scenario, commands are processed over and over again until // we are able to fetch a message. bool block = (ticks != 0); - while (rc != 0) { - if (errno != EAGAIN) - return -1; - if (unlikely (process_commands (block, false) != 0)) + while (true) { + if (unlikely (process_commands (block ? timeout : 0, false) != 0)) return -1; rc = xrecv (msg_, flags_); - ticks = 0; + if (rc == 0) { + ticks = 0; + break; + } + if (unlikely (errno != EAGAIN)) + return -1; block = true; + if (timeout > 0) { + timeout = end - clock.now_ms (); + if (timeout <= 0) { + errno = EAGAIN; + return -1; + } + } } rcvmore = msg_->flags () & msg_t::more; @@ -658,18 +688,20 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) check_destroy (); } -int zmq::socket_base_t::process_commands (bool block_, bool throttle_) +int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) { int rc; command_t cmd; - if (block_) { - rc = mailbox.recv (&cmd, true); - if (rc == -1 && errno == EINTR) - return -1; - errno_assert (rc == 0); + if (timeout_ != 0) { + + // If we are asked to wait, simply ask mailbox to wait. + rc = mailbox.recv (&cmd, timeout_); } else { + // If we are asked not to wait, check whether we haven't processed + // commands recently, so that we can throttle the new commands. + // Get the CPU's tick counter. If 0, the counter is not available. uint64_t tsc = zmq::clock_t::rdtsc (); @@ -690,7 +722,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = mailbox.recv (&cmd, false); + rc = mailbox.recv (&cmd, 0); } // Process all the commands available at the moment. @@ -701,7 +733,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) return -1; errno_assert (rc == 0); cmd.destination->process_command (cmd); - rc = mailbox.recv (&cmd, false); + rc = mailbox.recv (&cmd, 0); } if (ctx_terminated) { @@ -797,9 +829,11 @@ void zmq::socket_base_t::xhiccuped (pipe_t *pipe_) void zmq::socket_base_t::in_event () { - // Process any commands from other threads/sockets that may be available - // at the moment. Ultimately, socket will be destroyed. - process_commands (false, false); + // This function is invoked only once the socket is running in the context + // of the reaper thread. Process any commands from other threads/sockets + // that may be available at the moment. Ultimately, the socket will + // be destroyed. + process_commands (0, false); check_destroy (); } -- cgit v1.2.3