summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp79
1 files changed, 58 insertions, 21 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 24789b8..2167b0b 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -277,7 +277,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- int rc = process_commands (false, false);
+ int rc = process_commands (0, false);
if (rc != 0 && (errno == EINTR || errno == ETERM))
return -1;
errno_assert (rc == 0);
@@ -480,7 +480,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
}
// Process pending commands, if any.
- int rc = process_commands (false, true);
+ int rc = process_commands (0, true);
if (unlikely (rc != 0))
return -1;
@@ -492,20 +492,37 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
rc = xsend (msg_, flags_);
if (rc == 0)
return 0;
+ if (unlikely (errno != EAGAIN))
+ return -1;
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - upwards.
if (flags_ & ZMQ_NOBLOCK)
return -1;
+ // Compute the time when the timeout should occur.
+ // If the timeout is infite, don't care.
+ clock_t clock ;
+ int timeout = -1;
+ uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
+
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
- while (rc != 0) {
- if (errno != EAGAIN)
- return -1;
- if (unlikely (process_commands (true, false) != 0))
+ while (true) {
+ if (unlikely (process_commands (timeout, false) != 0))
return -1;
rc = xsend (msg_, flags_);
+ if (rc == 0)
+ break;
+ if (unlikely (errno != EAGAIN))
+ return -1;
+ if (timeout > 0) {
+ timeout = (int) (end - clock.now_ms ());
+ if (timeout <= 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+ }
}
return 0;
}
@@ -526,6 +543,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// Get the message.
int rc = xrecv (msg_, flags_);
+ if (unlikely (rc != 0 && errno != EAGAIN))
+ return -1;
int err = errno;
// Once every inbound_poll_rate messages check for signals and process
@@ -537,7 +556,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing RDTSC all the time.
if (++ticks == inbound_poll_rate) {
- if (unlikely (process_commands (false, false) != 0))
+ if (unlikely (process_commands (0, false) != 0))
return -1;
ticks = 0;
}
@@ -560,7 +579,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (flags_ & ZMQ_NOBLOCK) {
if (errno != EAGAIN)
return -1;
- if (unlikely (process_commands (false, false) != 0))
+ if (unlikely (process_commands (0, false) != 0))
return -1;
ticks = 0;
@@ -573,17 +592,33 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
return rc;
}
+ // Compute the time when the timeout should occur.
+ // If the timeout is infite, don't care.
+ clock_t clock ;
+ int timeout = -1;
+ uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
+
// In blocking scenario, commands are processed over and over again until
// we are able to fetch a message.
bool block = (ticks != 0);
- while (rc != 0) {
- if (errno != EAGAIN)
- return -1;
- if (unlikely (process_commands (block, false) != 0))
+ while (true) {
+ if (unlikely (process_commands (block ? timeout : 0, false) != 0))
return -1;
rc = xrecv (msg_, flags_);
- ticks = 0;
+ if (rc == 0) {
+ ticks = 0;
+ break;
+ }
+ if (unlikely (errno != EAGAIN))
+ return -1;
block = true;
+ if (timeout > 0) {
+ timeout = (int) (end - clock.now_ms ());
+ if (timeout <= 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+ }
}
rcvmore = msg_->flags & ZMQ_MSG_MORE;
@@ -655,18 +690,20 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
poller->set_pollin (handle);
}
-int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
+int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
{
int rc;
command_t cmd;
- if (block_) {
- rc = mailbox.recv (&cmd, true);
- if (rc == -1 && errno == EINTR)
- return -1;
- errno_assert (rc == 0);
+ if (timeout_ != 0) {
+
+ // If we are asked to wait, simply ask mailbox to wait.
+ rc = mailbox.recv (&cmd, timeout_);
}
else {
+ // If we are asked not to wait, check whether we haven't processed
+ // commands recently, so that we can throttle the new commands.
+
// Get the CPU's tick counter. If 0, the counter is not available.
uint64_t tsc = zmq::clock_t::rdtsc ();
@@ -687,7 +724,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
}
// Check whether there are any commands pending for this thread.
- rc = mailbox.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, 0);
}
// Process all the commands available at the moment.
@@ -698,7 +735,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
return -1;
errno_assert (rc == 0);
cmd.destination->process_command (cmd);
- rc = mailbox.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, 0);
}
if (ctx_terminated) {