summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp71
1 files changed, 39 insertions, 32 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 4a0ed24..cdad09d 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -244,7 +244,10 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- process_commands (false, false);
+ int rc = process_commands (false, false);
+ if (rc != 0 && errno == EINTR)
+ return -1;
+ errno_assert (rc == 0);
*((uint32_t*) optval_) = 0;
if (has_out ())
*((uint32_t*) optval_) |= ZMQ_POLLOUT;
@@ -420,18 +423,16 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
}
// Process pending commands, if any.
- process_commands (false, true);
- if (unlikely (zombie)) {
- errno = ETERM;
+ int rc = process_commands (false, true);
+ if (unlikely (rc != 0))
return -1;
- }
// At this point we impose the MORE flag on the message.
if (flags_ & ZMQ_SNDMORE)
msg_->flags |= ZMQ_MSG_MORE;
// Try to send the message.
- int rc = xsend (msg_, flags_);
+ rc = xsend (msg_, flags_);
if (rc == 0)
return 0;
@@ -445,11 +446,8 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
while (rc != 0) {
if (errno != EAGAIN)
return -1;
- process_commands (true, false);
- if (unlikely (zombie)) {
- errno = ETERM;
+ if (unlikely (process_commands (true, false) != 0))
return -1;
- }
rc = xsend (msg_, flags_);
}
return 0;
@@ -475,11 +473,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing rdtsc all the time.
if (++ticks == inbound_poll_rate) {
- process_commands (false, false);
- if (unlikely (zombie)) {
- errno = ETERM;
+ if (unlikely (process_commands (false, false) != 0))
return -1;
- }
ticks = 0;
}
@@ -501,11 +496,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (flags_ & ZMQ_NOBLOCK) {
if (errno != EAGAIN)
return -1;
- process_commands (false, false);
- if (unlikely (zombie)) {
- errno = ETERM;
+ if (unlikely (process_commands (false, false) != 0))
return -1;
- }
ticks = 0;
rc = xrecv (msg_, flags_);
@@ -522,11 +514,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
while (rc != 0) {
if (errno != EAGAIN)
return -1;
- process_commands (true, false);
- if (unlikely (zombie)) {
- errno = ETERM;
+ if (unlikely (process_commands (true, false) != 0))
return -1;
- }
rc = xrecv (msg_, flags_);
ticks = 0;
}
@@ -619,13 +608,15 @@ bool zmq::socket_base_t::dezombify ()
return false;
}
-void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
+int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
{
- bool received;
+ int rc;
command_t cmd;
if (block_) {
- received = signaler.recv (&cmd, true);
- zmq_assert (received);
+ rc = signaler.recv (&cmd, true);
+ if (rc == -1 && errno == EINTR)
+ return -1;
+ errno_assert (rc == 0);
}
else {
@@ -649,24 +640,40 @@ void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
#else
#error
#endif
-
// Check whether certain time have elapsed since last command
// processing.
- if (current_time - last_processing_time <= max_command_delay)
- return;
+ if (current_time - last_processing_time <= max_command_delay) {
+
+ // No command was processed, so the socket should
+ // not get into the zombie state.
+ zmq_assert (!zombie);
+ return 0;
+ }
last_processing_time = current_time;
}
#endif
// Check whether there are any commands pending for this thread.
- received = signaler.recv (&cmd, false);
+ rc = signaler.recv (&cmd, false);
}
// Process all the commands available at the moment.
- while (received) {
+ while (true) {
+ if (rc == -1 && errno == EAGAIN)
+ break;
+ if (rc == -1 && errno == EINTR)
+ return -1;
+ errno_assert (rc == 0);
cmd.destination->process_command (cmd);
- received = signaler.recv (&cmd, false);
+ rc = signaler.recv (&cmd, false);
+ }
+
+ if (zombie) {
+ errno = ETERM;
+ return -1;
}
+
+ return 0;
}
void zmq::socket_base_t::process_stop ()