diff options
-rw-r--r-- | doc/zmq_getsockopt.txt | 2 | ||||
-rw-r--r-- | doc/zmq_poll.txt | 2 | ||||
-rw-r--r-- | doc/zmq_recv.txt | 3 | ||||
-rw-r--r-- | doc/zmq_send.txt | 3 | ||||
-rw-r--r-- | doc/zmq_setsockopt.txt | 2 | ||||
-rw-r--r-- | src/ctx.cpp | 5 | ||||
-rw-r--r-- | src/ctx.hpp | 3 | ||||
-rw-r--r-- | src/io_thread.cpp | 8 | ||||
-rw-r--r-- | src/signaler.cpp | 47 | ||||
-rw-r--r-- | src/signaler.hpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 71 | ||||
-rw-r--r-- | src/socket_base.hpp | 3 | ||||
-rw-r--r-- | src/zmq.cpp | 26 |
13 files changed, 93 insertions, 84 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 1e36a2a..d179e79 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -216,6 +216,8 @@ option value. The 0MQ 'context' associated with the specified 'socket' was terminated. *EFAULT*:: The provided 'socket' was not valid (NULL). +*EINTR*:: +The operation was interrupted by delivery of a signal. EXAMPLE diff --git a/doc/zmq_poll.txt b/doc/zmq_poll.txt index fe2a209..92c0a3d 100644 --- a/doc/zmq_poll.txt +++ b/doc/zmq_poll.txt @@ -98,6 +98,8 @@ At least one of the members of the 'items' array refers to a 'socket' whose associated 0MQ 'context' was terminated. *EFAULT*:: The provided 'items' was not valid (NULL). +*EINTR*:: +The poll was interrupted by delivery of a signal before any event was available. EXAMPLE diff --git a/doc/zmq_recv.txt b/doc/zmq_recv.txt index dc60af6..dbbbd75 100644 --- a/doc/zmq_recv.txt +++ b/doc/zmq_recv.txt @@ -65,6 +65,9 @@ _messaging patterns_ section of linkzmq:zmq_socket[3] for more information. The 0MQ 'context' associated with the specified 'socket' was terminated. *EFAULT*:: The provided 'socket' was not valid (NULL). +*EINTR*:: +The operation was interrupted by delivery of a signal before a message was +available. EXAMPLE diff --git a/doc/zmq_send.txt b/doc/zmq_send.txt index 793d1a8..231dfcc 100644 --- a/doc/zmq_send.txt +++ b/doc/zmq_send.txt @@ -71,6 +71,9 @@ _messaging patterns_ section of linkzmq:zmq_socket[3] for more information. The 0MQ 'context' associated with the specified 'socket' was terminated. *EFAULT*:: The provided 'context' was not valid (NULL). +*EINTR*:: +The operation was interrupted by delivery of a signal before the message was +sent. EXAMPLE diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 1b551c6..a60fc26 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -231,6 +231,8 @@ _option_value_ is invalid. The 0MQ 'context' associated with the specified 'socket' was terminated. *EFAULT*:: The provided 'socket' was not valid (NULL). +*EINTR*:: +The operation was interrupted by delivery of a signal. EXAMPLE diff --git a/src/ctx.cpp b/src/ctx.cpp index 65c5316..2660e1f 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -219,11 +219,6 @@ void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_) slots [slot_]->send (command_); } -bool zmq::ctx_t::recv_command (uint32_t slot_, command_t *command_, bool block_) -{ - return slots [slot_]->recv (command_, block_); -} - zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) { // Find the I/O thread with minimum load. diff --git a/src/ctx.hpp b/src/ctx.hpp index 5f6cc83..360ca0e 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -64,9 +64,6 @@ namespace zmq // Send command to the destination slot. void send_command (uint32_t slot_, const command_t &command_); - // Receive command from the source slot. - bool recv_command (uint32_t slot_, command_t *command_, bool block_); - // Returns the I/O thread that is the least busy at the moment. // Taskset specifies which I/O threads are eligible (0 = all). class io_thread_t *choose_io_thread (uint64_t taskset_); diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 3d202cf..d1d95f3 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -71,8 +71,12 @@ void zmq::io_thread_t::in_event () // Get the next command. If there is none, exit. command_t cmd; - if (!signaler.recv (&cmd, false)) - break; + int rc = signaler.recv (&cmd, false); + if (rc != 0 && errno == EINTR) + continue; + if (rc != 0 && errno == EAGAIN) + break; + errno_assert (rc == 0); // Process the command. cmd.destination->process_command (cmd); diff --git a/src/signaler.cpp b/src/signaler.cpp index d4a9214..c14a709 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -112,7 +112,7 @@ void zmq::signaler_t::send (const command_t &cmd_) zmq_assert (rc == sizeof (command_t)); } -bool zmq::signaler_t::recv (command_t *cmd_, bool block_) +int zmq::signaler_t::recv (command_t *cmd_, bool block_) { if (block_) { @@ -122,10 +122,12 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) wsa_assert (rc != SOCKET_ERROR); } - bool result; + int err; + int result; int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { - result = false; + err = EAGAIN; + result = -1; } else { wsa_assert (nbytes != -1); @@ -133,7 +135,7 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) // Check whether we haven't got half of a signal. zmq_assert (nbytes % sizeof (uint32_t) == 0); - result = true; + result = 0; } if (block_) { @@ -144,6 +146,8 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) wsa_assert (rc != SOCKET_ERROR); } + if (result == -1) + errno = err; return result; } @@ -184,7 +188,7 @@ void zmq::signaler_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -bool zmq::signaler_t::recv (command_t *cmd_, bool block_) +int zmq::signaler_t::recv (command_t *cmd_, bool block_) { if (block_) { @@ -196,13 +200,12 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) errno_assert (rc != -1); } - bool result; - ssize_t nbytes; - do { - nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); - } while (nbytes == -1 && errno == EINTR); - if (nbytes == -1 && errno == EAGAIN) { - result = false; + int err; + int result; + ssize_t nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); + if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) { + err = errno; + result = -1; } else { zmq_assert (nbytes != -1); @@ -210,7 +213,7 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) // Check whether we haven't got half of command. zmq_assert (nbytes == sizeof (command_t)); - result = true; + result = 0; } if (block_) { @@ -223,6 +226,8 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) errno_assert (rc != -1); } + if (result == -1) + errno = err; return result; } @@ -266,24 +271,18 @@ void zmq::signaler_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -bool zmq::signaler_t::recv (command_t *cmd_, bool block_) +int zmq::signaler_t::recv (command_t *cmd_, bool block_) { ssize_t nbytes; - do { - nbytes = ::recv (r, cmd_, sizeof (command_t), - block_ ? 0 : MSG_DONTWAIT); - } while (nbytes == -1 && errno == EINTR); - - // If there's no signal available return false. - if (nbytes == -1 && errno == EAGAIN) - return false; - + nbytes = ::recv (r, cmd_, sizeof (command_t), block_ ? 0 : MSG_DONTWAIT); + if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) + return -1; errno_assert (nbytes != -1); // Check whether we haven't got half of command. zmq_assert (nbytes == sizeof (command_t)); - return true; + return 0; } #endif diff --git a/src/signaler.hpp b/src/signaler.hpp index 64a1899..217c3a6 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -40,7 +40,7 @@ namespace zmq fd_t get_fd (); void send (const command_t &cmd_); - bool recv (command_t *cmd_, bool block_); + int recv (command_t *cmd_, bool block_); private: diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 4a0ed24..cdad09d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -244,7 +244,10 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - process_commands (false, false); + int rc = process_commands (false, false); + if (rc != 0 && errno == EINTR) + return -1; + errno_assert (rc == 0); *((uint32_t*) optval_) = 0; if (has_out ()) *((uint32_t*) optval_) |= ZMQ_POLLOUT; @@ -420,18 +423,16 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) } // Process pending commands, if any. - process_commands (false, true); - if (unlikely (zombie)) { - errno = ETERM; + int rc = process_commands (false, true); + if (unlikely (rc != 0)) return -1; - } // At this point we impose the MORE flag on the message. if (flags_ & ZMQ_SNDMORE) msg_->flags |= ZMQ_MSG_MORE; // Try to send the message. - int rc = xsend (msg_, flags_); + rc = xsend (msg_, flags_); if (rc == 0) return 0; @@ -445,11 +446,8 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) while (rc != 0) { if (errno != EAGAIN) return -1; - process_commands (true, false); - if (unlikely (zombie)) { - errno = ETERM; + if (unlikely (process_commands (true, false) != 0)) return -1; - } rc = xsend (msg_, flags_); } return 0; @@ -475,11 +473,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) // described above) from the one used by 'send'. This is because counting // ticks is more efficient than doing rdtsc all the time. if (++ticks == inbound_poll_rate) { - process_commands (false, false); - if (unlikely (zombie)) { - errno = ETERM; + if (unlikely (process_commands (false, false) != 0)) return -1; - } ticks = 0; } @@ -501,11 +496,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) if (flags_ & ZMQ_NOBLOCK) { if (errno != EAGAIN) return -1; - process_commands (false, false); - if (unlikely (zombie)) { - errno = ETERM; + if (unlikely (process_commands (false, false) != 0)) return -1; - } ticks = 0; rc = xrecv (msg_, flags_); @@ -522,11 +514,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) while (rc != 0) { if (errno != EAGAIN) return -1; - process_commands (true, false); - if (unlikely (zombie)) { - errno = ETERM; + if (unlikely (process_commands (true, false) != 0)) return -1; - } rc = xrecv (msg_, flags_); ticks = 0; } @@ -619,13 +608,15 @@ bool zmq::socket_base_t::dezombify () return false; } -void zmq::socket_base_t::process_commands (bool block_, bool throttle_) +int zmq::socket_base_t::process_commands (bool block_, bool throttle_) { - bool received; + int rc; command_t cmd; if (block_) { - received = signaler.recv (&cmd, true); - zmq_assert (received); + rc = signaler.recv (&cmd, true); + if (rc == -1 && errno == EINTR) + return -1; + errno_assert (rc == 0); } else { @@ -649,24 +640,40 @@ void zmq::socket_base_t::process_commands (bool block_, bool throttle_) #else #error #endif - // Check whether certain time have elapsed since last command // processing. - if (current_time - last_processing_time <= max_command_delay) - return; + if (current_time - last_processing_time <= max_command_delay) { + + // No command was processed, so the socket should + // not get into the zombie state. + zmq_assert (!zombie); + return 0; + } last_processing_time = current_time; } #endif // Check whether there are any commands pending for this thread. - received = signaler.recv (&cmd, false); + rc = signaler.recv (&cmd, false); } // Process all the commands available at the moment. - while (received) { + while (true) { + if (rc == -1 && errno == EAGAIN) + break; + if (rc == -1 && errno == EINTR) + return -1; + errno_assert (rc == 0); cmd.destination->process_command (cmd); - received = signaler.recv (&cmd, false); + rc = signaler.recv (&cmd, false); + } + + if (zombie) { + errno = ETERM; + return -1; } + + return 0; } void zmq::socket_base_t::process_stop () diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1a7f9f0..4ba9494 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -124,7 +124,6 @@ namespace zmq private: -// TODO: Check whether we still need this flag... // If true, socket was already closed but not yet deallocated // because either shutdown is in process or there are still pipes // attached to the socket. @@ -147,7 +146,7 @@ namespace zmq // set to true, returns only after at least one command was processed. // If throttle argument is true, commands are processed at most once // in a predefined time period. - void process_commands (bool block_, bool throttle_); + int process_commands (bool block_, bool throttle_); // Handlers for incoming commands. void process_stop (); diff --git a/src/zmq.cpp b/src/zmq.cpp index 736f764..9a4bdec 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -409,17 +409,12 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) while (true) { - // Wait for events. Ignore interrupts if there's infinite timeout. + // Wait for events. while (true) { int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout); if (rc == -1 && errno == EINTR) { - if (timeout_ < 0) - continue; - else { - // TODO: Calculate remaining timeout and restart poll (). - free (pollfds); - return 0; - } + free (pollfds); + return -1; } errno_assert (rc >= 0); break; @@ -474,6 +469,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) if (timeout == -1 && nevents == 0) continue; + // TODO: if nevents is zero recompute timeout and loop + // if it is not yet reached. + break; } @@ -544,13 +542,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #if defined ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else - if (rc == -1 && errno == EINTR) { - if (timeout_ < 0) - continue; - else - // TODO: Calculate remaining timeout and restart select (). - return 0; - } + if (rc == -1 && errno == EINTR) + return -1; errno_assert (rc >= 0); #endif break; @@ -610,6 +603,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) if (timeout_ < 0 && nevents == 0) continue; + // TODO: if nevents is zero recompute timeout and loop + // if it is not yet reached. + break; } |