summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/zmq_getsockopt.txt2
-rw-r--r--doc/zmq_poll.txt2
-rw-r--r--doc/zmq_recv.txt3
-rw-r--r--doc/zmq_send.txt3
-rw-r--r--doc/zmq_setsockopt.txt2
-rw-r--r--src/ctx.cpp5
-rw-r--r--src/ctx.hpp3
-rw-r--r--src/io_thread.cpp8
-rw-r--r--src/signaler.cpp47
-rw-r--r--src/signaler.hpp2
-rw-r--r--src/socket_base.cpp71
-rw-r--r--src/socket_base.hpp3
-rw-r--r--src/zmq.cpp26
13 files changed, 93 insertions, 84 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 1e36a2a..d179e79 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -216,6 +216,8 @@ option value.
The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*::
The provided 'socket' was not valid (NULL).
+*EINTR*::
+The operation was interrupted by delivery of a signal.
EXAMPLE
diff --git a/doc/zmq_poll.txt b/doc/zmq_poll.txt
index fe2a209..92c0a3d 100644
--- a/doc/zmq_poll.txt
+++ b/doc/zmq_poll.txt
@@ -98,6 +98,8 @@ At least one of the members of the 'items' array refers to a 'socket' whose
associated 0MQ 'context' was terminated.
*EFAULT*::
The provided 'items' was not valid (NULL).
+*EINTR*::
+The poll was interrupted by delivery of a signal before any event was available.
EXAMPLE
diff --git a/doc/zmq_recv.txt b/doc/zmq_recv.txt
index dc60af6..dbbbd75 100644
--- a/doc/zmq_recv.txt
+++ b/doc/zmq_recv.txt
@@ -65,6 +65,9 @@ _messaging patterns_ section of linkzmq:zmq_socket[3] for more information.
The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*::
The provided 'socket' was not valid (NULL).
+*EINTR*::
+The operation was interrupted by delivery of a signal before a message was
+available.
EXAMPLE
diff --git a/doc/zmq_send.txt b/doc/zmq_send.txt
index 793d1a8..231dfcc 100644
--- a/doc/zmq_send.txt
+++ b/doc/zmq_send.txt
@@ -71,6 +71,9 @@ _messaging patterns_ section of linkzmq:zmq_socket[3] for more information.
The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*::
The provided 'context' was not valid (NULL).
+*EINTR*::
+The operation was interrupted by delivery of a signal before the message was
+sent.
EXAMPLE
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 1b551c6..a60fc26 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -231,6 +231,8 @@ _option_value_ is invalid.
The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*::
The provided 'socket' was not valid (NULL).
+*EINTR*::
+The operation was interrupted by delivery of a signal.
EXAMPLE
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 65c5316..2660e1f 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -219,11 +219,6 @@ void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
slots [slot_]->send (command_);
}
-bool zmq::ctx_t::recv_command (uint32_t slot_, command_t *command_, bool block_)
-{
- return slots [slot_]->recv (command_, block_);
-}
-
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
{
// Find the I/O thread with minimum load.
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 5f6cc83..360ca0e 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -64,9 +64,6 @@ namespace zmq
// Send command to the destination slot.
void send_command (uint32_t slot_, const command_t &command_);
- // Receive command from the source slot.
- bool recv_command (uint32_t slot_, command_t *command_, bool block_);
-
// Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_);
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 3d202cf..d1d95f3 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -71,8 +71,12 @@ void zmq::io_thread_t::in_event ()
// Get the next command. If there is none, exit.
command_t cmd;
- if (!signaler.recv (&cmd, false))
- break;
+ int rc = signaler.recv (&cmd, false);
+ if (rc != 0 && errno == EINTR)
+ continue;
+ if (rc != 0 && errno == EAGAIN)
+ break;
+ errno_assert (rc == 0);
// Process the command.
cmd.destination->process_command (cmd);
diff --git a/src/signaler.cpp b/src/signaler.cpp
index d4a9214..c14a709 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -112,7 +112,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (rc == sizeof (command_t));
}
-bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{
if (block_) {
@@ -122,10 +122,12 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
wsa_assert (rc != SOCKET_ERROR);
}
- bool result;
+ int err;
+ int result;
int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
- result = false;
+ err = EAGAIN;
+ result = -1;
}
else {
wsa_assert (nbytes != -1);
@@ -133,7 +135,7 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
// Check whether we haven't got half of a signal.
zmq_assert (nbytes % sizeof (uint32_t) == 0);
- result = true;
+ result = 0;
}
if (block_) {
@@ -144,6 +146,8 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
wsa_assert (rc != SOCKET_ERROR);
}
+ if (result == -1)
+ errno = err;
return result;
}
@@ -184,7 +188,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{
if (block_) {
@@ -196,13 +200,12 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
errno_assert (rc != -1);
}
- bool result;
- ssize_t nbytes;
- do {
- nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
- } while (nbytes == -1 && errno == EINTR);
- if (nbytes == -1 && errno == EAGAIN) {
- result = false;
+ int err;
+ int result;
+ ssize_t nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
+ if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) {
+ err = errno;
+ result = -1;
}
else {
zmq_assert (nbytes != -1);
@@ -210,7 +213,7 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
// Check whether we haven't got half of command.
zmq_assert (nbytes == sizeof (command_t));
- result = true;
+ result = 0;
}
if (block_) {
@@ -223,6 +226,8 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
errno_assert (rc != -1);
}
+ if (result == -1)
+ errno = err;
return result;
}
@@ -266,24 +271,18 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{
ssize_t nbytes;
- do {
- nbytes = ::recv (r, cmd_, sizeof (command_t),
- block_ ? 0 : MSG_DONTWAIT);
- } while (nbytes == -1 && errno == EINTR);
-
- // If there's no signal available return false.
- if (nbytes == -1 && errno == EAGAIN)
- return false;
-
+ nbytes = ::recv (r, cmd_, sizeof (command_t), block_ ? 0 : MSG_DONTWAIT);
+ if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
+ return -1;
errno_assert (nbytes != -1);
// Check whether we haven't got half of command.
zmq_assert (nbytes == sizeof (command_t));
- return true;
+ return 0;
}
#endif
diff --git a/src/signaler.hpp b/src/signaler.hpp
index 64a1899..217c3a6 100644
--- a/src/signaler.hpp
+++ b/src/signaler.hpp
@@ -40,7 +40,7 @@ namespace zmq
fd_t get_fd ();
void send (const command_t &cmd_);
- bool recv (command_t *cmd_, bool block_);
+ int recv (command_t *cmd_, bool block_);
private:
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 4a0ed24..cdad09d 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -244,7 +244,10 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- process_commands (false, false);
+ int rc = process_commands (false, false);
+ if (rc != 0 && errno == EINTR)
+ return -1;
+ errno_assert (rc == 0);
*((uint32_t*) optval_) = 0;
if (has_out ())
*((uint32_t*) optval_) |= ZMQ_POLLOUT;
@@ -420,18 +423,16 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
}
// Process pending commands, if any.
- process_commands (false, true);
- if (unlikely (zombie)) {
- errno = ETERM;
+ int rc = process_commands (false, true);
+ if (unlikely (rc != 0))
return -1;
- }
// At this point we impose the MORE flag on the message.
if (flags_ & ZMQ_SNDMORE)
msg_->flags |= ZMQ_MSG_MORE;
// Try to send the message.
- int rc = xsend (msg_, flags_);
+ rc = xsend (msg_, flags_);
if (rc == 0)
return 0;
@@ -445,11 +446,8 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
while (rc != 0) {
if (errno != EAGAIN)
return -1;
- process_commands (true, false);
- if (unlikely (zombie)) {
- errno = ETERM;
+ if (unlikely (process_commands (true, false) != 0))
return -1;
- }
rc = xsend (msg_, flags_);
}
return 0;
@@ -475,11 +473,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing rdtsc all the time.
if (++ticks == inbound_poll_rate) {
- process_commands (false, false);
- if (unlikely (zombie)) {
- errno = ETERM;
+ if (unlikely (process_commands (false, false) != 0))
return -1;
- }
ticks = 0;
}
@@ -501,11 +496,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (flags_ & ZMQ_NOBLOCK) {
if (errno != EAGAIN)
return -1;
- process_commands (false, false);
- if (unlikely (zombie)) {
- errno = ETERM;
+ if (unlikely (process_commands (false, false) != 0))
return -1;
- }
ticks = 0;
rc = xrecv (msg_, flags_);
@@ -522,11 +514,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
while (rc != 0) {
if (errno != EAGAIN)
return -1;
- process_commands (true, false);
- if (unlikely (zombie)) {
- errno = ETERM;
+ if (unlikely (process_commands (true, false) != 0))
return -1;
- }
rc = xrecv (msg_, flags_);
ticks = 0;
}
@@ -619,13 +608,15 @@ bool zmq::socket_base_t::dezombify ()
return false;
}
-void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
+int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
{
- bool received;
+ int rc;
command_t cmd;
if (block_) {
- received = signaler.recv (&cmd, true);
- zmq_assert (received);
+ rc = signaler.recv (&cmd, true);
+ if (rc == -1 && errno == EINTR)
+ return -1;
+ errno_assert (rc == 0);
}
else {
@@ -649,24 +640,40 @@ void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
#else
#error
#endif
-
// Check whether certain time have elapsed since last command
// processing.
- if (current_time - last_processing_time <= max_command_delay)
- return;
+ if (current_time - last_processing_time <= max_command_delay) {
+
+ // No command was processed, so the socket should
+ // not get into the zombie state.
+ zmq_assert (!zombie);
+ return 0;
+ }
last_processing_time = current_time;
}
#endif
// Check whether there are any commands pending for this thread.
- received = signaler.recv (&cmd, false);
+ rc = signaler.recv (&cmd, false);
}
// Process all the commands available at the moment.
- while (received) {
+ while (true) {
+ if (rc == -1 && errno == EAGAIN)
+ break;
+ if (rc == -1 && errno == EINTR)
+ return -1;
+ errno_assert (rc == 0);
cmd.destination->process_command (cmd);
- received = signaler.recv (&cmd, false);
+ rc = signaler.recv (&cmd, false);
+ }
+
+ if (zombie) {
+ errno = ETERM;
+ return -1;
}
+
+ return 0;
}
void zmq::socket_base_t::process_stop ()
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 1a7f9f0..4ba9494 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -124,7 +124,6 @@ namespace zmq
private:
-// TODO: Check whether we still need this flag...
// If true, socket was already closed but not yet deallocated
// because either shutdown is in process or there are still pipes
// attached to the socket.
@@ -147,7 +146,7 @@ namespace zmq
// set to true, returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once
// in a predefined time period.
- void process_commands (bool block_, bool throttle_);
+ int process_commands (bool block_, bool throttle_);
// Handlers for incoming commands.
void process_stop ();
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 736f764..9a4bdec 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -409,17 +409,12 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
while (true) {
- // Wait for events. Ignore interrupts if there's infinite timeout.
+ // Wait for events.
while (true) {
int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout);
if (rc == -1 && errno == EINTR) {
- if (timeout_ < 0)
- continue;
- else {
- // TODO: Calculate remaining timeout and restart poll ().
- free (pollfds);
- return 0;
- }
+ free (pollfds);
+ return -1;
}
errno_assert (rc >= 0);
break;
@@ -474,6 +469,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
if (timeout == -1 && nevents == 0)
continue;
+ // TODO: if nevents is zero recompute timeout and loop
+ // if it is not yet reached.
+
break;
}
@@ -544,13 +542,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
- if (rc == -1 && errno == EINTR) {
- if (timeout_ < 0)
- continue;
- else
- // TODO: Calculate remaining timeout and restart select ().
- return 0;
- }
+ if (rc == -1 && errno == EINTR)
+ return -1;
errno_assert (rc >= 0);
#endif
break;
@@ -610,6 +603,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
if (timeout_ < 0 && nevents == 0)
continue;
+ // TODO: if nevents is zero recompute timeout and loop
+ // if it is not yet reached.
+
break;
}