diff options
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 71 |
1 files changed, 39 insertions, 32 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 4a0ed24..cdad09d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -244,7 +244,10 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - process_commands (false, false); + int rc = process_commands (false, false); + if (rc != 0 && errno == EINTR) + return -1; + errno_assert (rc == 0); *((uint32_t*) optval_) = 0; if (has_out ()) *((uint32_t*) optval_) |= ZMQ_POLLOUT; @@ -420,18 +423,16 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) } // Process pending commands, if any. - process_commands (false, true); - if (unlikely (zombie)) { - errno = ETERM; + int rc = process_commands (false, true); + if (unlikely (rc != 0)) return -1; - } // At this point we impose the MORE flag on the message. if (flags_ & ZMQ_SNDMORE) msg_->flags |= ZMQ_MSG_MORE; // Try to send the message. - int rc = xsend (msg_, flags_); + rc = xsend (msg_, flags_); if (rc == 0) return 0; @@ -445,11 +446,8 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) while (rc != 0) { if (errno != EAGAIN) return -1; - process_commands (true, false); - if (unlikely (zombie)) { - errno = ETERM; + if (unlikely (process_commands (true, false) != 0)) return -1; - } rc = xsend (msg_, flags_); } return 0; @@ -475,11 +473,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) // described above) from the one used by 'send'. This is because counting // ticks is more efficient than doing rdtsc all the time. if (++ticks == inbound_poll_rate) { - process_commands (false, false); - if (unlikely (zombie)) { - errno = ETERM; + if (unlikely (process_commands (false, false) != 0)) return -1; - } ticks = 0; } @@ -501,11 +496,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) if (flags_ & ZMQ_NOBLOCK) { if (errno != EAGAIN) return -1; - process_commands (false, false); - if (unlikely (zombie)) { - errno = ETERM; + if (unlikely (process_commands (false, false) != 0)) return -1; - } ticks = 0; rc = xrecv (msg_, flags_); @@ -522,11 +514,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) while (rc != 0) { if (errno != EAGAIN) return -1; - process_commands (true, false); - if (unlikely (zombie)) { - errno = ETERM; + if (unlikely (process_commands (true, false) != 0)) return -1; - } rc = xrecv (msg_, flags_); ticks = 0; } @@ -619,13 +608,15 @@ bool zmq::socket_base_t::dezombify () return false; } -void zmq::socket_base_t::process_commands (bool block_, bool throttle_) +int zmq::socket_base_t::process_commands (bool block_, bool throttle_) { - bool received; + int rc; command_t cmd; if (block_) { - received = signaler.recv (&cmd, true); - zmq_assert (received); + rc = signaler.recv (&cmd, true); + if (rc == -1 && errno == EINTR) + return -1; + errno_assert (rc == 0); } else { @@ -649,24 +640,40 @@ void zmq::socket_base_t::process_commands (bool block_, bool throttle_) #else #error #endif - // Check whether certain time have elapsed since last command // processing. - if (current_time - last_processing_time <= max_command_delay) - return; + if (current_time - last_processing_time <= max_command_delay) { + + // No command was processed, so the socket should + // not get into the zombie state. + zmq_assert (!zombie); + return 0; + } last_processing_time = current_time; } #endif // Check whether there are any commands pending for this thread. - received = signaler.recv (&cmd, false); + rc = signaler.recv (&cmd, false); } // Process all the commands available at the moment. - while (received) { + while (true) { + if (rc == -1 && errno == EAGAIN) + break; + if (rc == -1 && errno == EINTR) + return -1; + errno_assert (rc == 0); cmd.destination->process_command (cmd); - received = signaler.recv (&cmd, false); + rc = signaler.recv (&cmd, false); + } + + if (zombie) { + errno = ETERM; + return -1; } + + return 0; } void zmq::socket_base_t::process_stop () |