diff options
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 79 |
1 files changed, 58 insertions, 21 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 24789b8..2167b0b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -277,7 +277,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - int rc = process_commands (false, false); + int rc = process_commands (0, false); if (rc != 0 && (errno == EINTR || errno == ETERM)) return -1; errno_assert (rc == 0); @@ -480,7 +480,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) } // Process pending commands, if any. - int rc = process_commands (false, true); + int rc = process_commands (0, true); if (unlikely (rc != 0)) return -1; @@ -492,20 +492,37 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) rc = xsend (msg_, flags_); if (rc == 0) return 0; + if (unlikely (errno != EAGAIN)) + return -1; // In case of non-blocking send we'll simply propagate // the error - including EAGAIN - upwards. if (flags_ & ZMQ_NOBLOCK) return -1; + // Compute the time when the timeout should occur. + // If the timeout is infite, don't care. + clock_t clock ; + int timeout = -1; + uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout); + // Oops, we couldn't send the message. Wait for the next // command, process it and try to send the message again. - while (rc != 0) { - if (errno != EAGAIN) - return -1; - if (unlikely (process_commands (true, false) != 0)) + while (true) { + if (unlikely (process_commands (timeout, false) != 0)) return -1; rc = xsend (msg_, flags_); + if (rc == 0) + break; + if (unlikely (errno != EAGAIN)) + return -1; + if (timeout > 0) { + timeout = (int) (end - clock.now_ms ()); + if (timeout <= 0) { + errno = EAGAIN; + return -1; + } + } } return 0; } @@ -526,6 +543,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) // Get the message. int rc = xrecv (msg_, flags_); + if (unlikely (rc != 0 && errno != EAGAIN)) + return -1; int err = errno; // Once every inbound_poll_rate messages check for signals and process @@ -537,7 +556,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) // described above) from the one used by 'send'. This is because counting // ticks is more efficient than doing RDTSC all the time. if (++ticks == inbound_poll_rate) { - if (unlikely (process_commands (false, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; } @@ -560,7 +579,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) if (flags_ & ZMQ_NOBLOCK) { if (errno != EAGAIN) return -1; - if (unlikely (process_commands (false, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; @@ -573,17 +592,33 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) return rc; } + // Compute the time when the timeout should occur. + // If the timeout is infite, don't care. + clock_t clock ; + int timeout = -1; + uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout); + // In blocking scenario, commands are processed over and over again until // we are able to fetch a message. bool block = (ticks != 0); - while (rc != 0) { - if (errno != EAGAIN) - return -1; - if (unlikely (process_commands (block, false) != 0)) + while (true) { + if (unlikely (process_commands (block ? timeout : 0, false) != 0)) return -1; rc = xrecv (msg_, flags_); - ticks = 0; + if (rc == 0) { + ticks = 0; + break; + } + if (unlikely (errno != EAGAIN)) + return -1; block = true; + if (timeout > 0) { + timeout = (int) (end - clock.now_ms ()); + if (timeout <= 0) { + errno = EAGAIN; + return -1; + } + } } rcvmore = msg_->flags & ZMQ_MSG_MORE; @@ -655,18 +690,20 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) poller->set_pollin (handle); } -int zmq::socket_base_t::process_commands (bool block_, bool throttle_) +int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) { int rc; command_t cmd; - if (block_) { - rc = mailbox.recv (&cmd, true); - if (rc == -1 && errno == EINTR) - return -1; - errno_assert (rc == 0); + if (timeout_ != 0) { + + // If we are asked to wait, simply ask mailbox to wait. + rc = mailbox.recv (&cmd, timeout_); } else { + // If we are asked not to wait, check whether we haven't processed + // commands recently, so that we can throttle the new commands. + // Get the CPU's tick counter. If 0, the counter is not available. uint64_t tsc = zmq::clock_t::rdtsc (); @@ -687,7 +724,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = mailbox.recv (&cmd, false); + rc = mailbox.recv (&cmd, 0); } // Process all the commands available at the moment. @@ -698,7 +735,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) return -1; errno_assert (rc == 0); cmd.destination->process_command (cmd); - rc = mailbox.recv (&cmd, false); + rc = mailbox.recv (&cmd, 0); } if (ctx_terminated) { |