summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFabien Ninoles <fabien@tzone.org>2011-06-17 12:22:02 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-06-17 12:22:02 +0200
commitd7923f08cab62ef40027a92f596ff45428870838 (patch)
tree370ad14bc9d1ebbc14f9d5f8077f81e28e301f5f /src
parent65d2b70312efb148814b58d9cd38cc7069b53a3b (diff)
Add sockopt ZMQ_RCVTIMEO/ZMQ_SNDTIMEO.
- Add doc and tests - Add options and setup - Wait using poll/select Signed-off-by: Fabien Ninoles <fabien@tzone.org> Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/ctx.cpp2
-rw-r--r--src/io_thread.cpp2
-rw-r--r--src/mailbox.cpp106
-rw-r--r--src/mailbox.hpp7
-rw-r--r--src/options.cpp36
-rw-r--r--src/options.hpp4
-rw-r--r--src/reaper.cpp2
-rw-r--r--src/socket_base.cpp100
-rw-r--r--src/socket_base.hpp6
9 files changed, 217 insertions, 48 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index fb5420d..783bcba 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -142,7 +142,7 @@ int zmq::ctx_t::terminate ()
// Wait till reaper thread closes all the sockets.
command_t cmd;
- int rc = term_mailbox.recv (&cmd, true);
+ int rc = term_mailbox.recv (&cmd, -1);
if (rc == -1 && errno == EINTR)
return -1;
zmq_assert (rc == 0);
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 9678392..c6f3880 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -70,7 +70,7 @@ void zmq::io_thread_t::in_event ()
// Get the next command. If there is none, exit.
command_t cmd;
- int rc = mailbox.recv (&cmd, false);
+ int rc = mailbox.recv (&cmd, 0);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index 221396b..402d025 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -18,8 +18,33 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "mailbox.hpp"
#include "platform.hpp"
+
+#if defined ZMQ_FORCE_SELECT
+#define ZMQ_RCVTIMEO_BASED_ON_SELECT
+#elif defined ZMQ_FORCE_POLL
+#define ZMQ_RCVTIMEO_BASED_ON_POLL
+#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
+ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
+ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
+ defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
+ defined ZMQ_HAVE_NETBSD
+#define ZMQ_RCVTIMEO_BASED_ON_POLL
+#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+#define ZMQ_RCVTIMEO_BASED_ON_SELECT
+#endif
+
+// On AIX, poll.h has to be included before zmq.h to get consistent
+// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
+// instead of 'events' and 'revents' and defines macros to map from POSIX-y
+// names to AIX-specific names).
+#if defined ZMQ_RCVTIMEO_BASED_ON_POLL
+#include <poll.h>
+#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT
+#include <sys/select.h>
+#endif
+
+#include "mailbox.hpp"
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"
@@ -79,10 +104,14 @@ void zmq::mailbox_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{
+ // If there's a finite timeout, poll on the fd.
+ if (timeout_ > 0)
+ return recv_timeout (cmd_, timeout_);
+
// If required, set the reader to blocking mode.
- if (block_) {
+ if (timeout_ < 0) {
unsigned long argp = 0;
int rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
@@ -97,7 +126,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
err = EAGAIN;
// Re-set the reader to non-blocking mode.
- if (block_) {
+ if (timeout_ < 0) {
unsigned long argp = 1;
int rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
@@ -194,20 +223,24 @@ void zmq::mailbox_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{
+ // If there's a finite timeout, poll on the fd.
+ if (timeout_ > 0)
+ return recv_timeout (cmd_, timeout_);
+
#ifdef MSG_DONTWAIT
// Attempt to read an entire command. Returns EAGAIN if non-blocking
// mode is requested and a command is not available.
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
- block_ ? 0 : MSG_DONTWAIT);
+ timeout_ < 0 ? 0 : MSG_DONTWAIT);
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
return -1;
#else
// If required, set the reader to blocking mode.
- if (block_) {
+ if (timeout_ < 0) {
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
@@ -223,7 +256,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
err = errno;
// Re-set the reader to non-blocking mode.
- if (block_) {
+ if (timeout_ < 0) {
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
@@ -380,3 +413,60 @@ int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
#endif
}
+int zmq::mailbox_t::recv_timeout (command_t *cmd_, int timeout_)
+{
+#ifdef ZMQ_RCVTIMEO_BASED_ON_POLL
+
+ struct pollfd pfd;
+ pfd.fd = r;
+ pfd.events = POLLIN;
+ int rc = poll (&pfd, 1, timeout_);
+ if (unlikely (rc < 0)) {
+ zmq_assert (errno == EINTR);
+ return -1;
+ }
+ else if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+ zmq_assert (pfd.revents & POLLIN);
+
+#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT
+
+ fd_set fds;
+ FD_ZERO (&fds);
+ FD_SET (r, &fds);
+ struct timeval timeout;
+ timeout.tv_sec = timeout_ / 1000;
+ timeout.tv_usec = timeout_ % 1000 * 1000;
+ int rc = select (r + 1, &fds, NULL, NULL, &timeout);
+ if (unlikely (rc < 0)) {
+ zmq_assert (errno == EINTR);
+ return -1;
+ }
+ else if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+
+#else
+#error
+#endif
+
+ // The file descriptor is ready for reading. Extract one command out of it.
+ ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
+ if (unlikely (rc < 0 && errno == EINTR))
+ return -1;
+ zmq_assert (nbytes == sizeof (command_t));
+ return 0;
+}
+
+#if defined ZMQ_RCVTIMEO_BASED_ON_SELECT
+#undef ZMQ_RCVTIMEO_BASED_ON_SELECT
+#endif
+#if defined ZMQ_RCVTIMEO_BASED_ON_POLL
+#undef ZMQ_RCVTIMEO_BASED_ON_POLL
+#endif
+
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index 96bf4eb..1b54aac 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -41,7 +41,7 @@ namespace zmq
fd_t get_fd ();
void send (const command_t &cmd_);
- int recv (command_t *cmd_, bool block_);
+ int recv (command_t *cmd_, int timeout_);
private:
@@ -52,6 +52,11 @@ namespace zmq
// Platform-dependent function to create a socketpair.
static int make_socketpair (fd_t *r_, fd_t *w_);
+ // Receives a command with the specific timeout.
+ // This function is not to be used for non-blocking or inifinitely
+ // blocking recvs.
+ int recv_timeout (command_t *cmd_, int timeout_);
+
// Disable copying of mailbox_t object.
mailbox_t (const mailbox_t&);
const mailbox_t &operator = (const mailbox_t&);
diff --git a/src/options.cpp b/src/options.cpp
index 29cf023..80ab294 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -39,6 +39,8 @@ zmq::options_t::options_t () :
backlog (100),
maxmsgsize (-1),
filter (1),
+ rcvtimeo (-1),
+ sndtimeo (-1),
immediate_connect (true)
{
}
@@ -182,6 +184,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
filter = *((int*) optval_);
return 0;
+ case ZMQ_RCVTIMEO:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ rcvtimeo = *((int*) optval_);
+ return 0;
+
+ case ZMQ_SNDTIMEO:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ sndtimeo = *((int*) optval_);
+ return 0;
+
}
errno = EINVAL;
@@ -336,6 +354,24 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
+ case ZMQ_RCVTIMEO:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvtimeo;
+ *optvallen_ = sizeof (int);
+ return 0;
+
+ case ZMQ_SNDTIMEO:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = sndtimeo;
+ *optvallen_ = sizeof (int);
+ return 0;
+
}
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index e055919..858ec2e 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -78,6 +78,10 @@ namespace zmq
// If 1, (X)SUB socket should filter the messages. If 0, it should not.
int filter;
+ // The timeout for send/recv operations for this socket.
+ int rcvtimeo;
+ int sndtimeo;
+
// If true, when connecting, pipes are created immediately without
// waiting for the connection to be established. That way the socket
// is not aware of the peer's identity, however, it is able to send
diff --git a/src/reaper.cpp b/src/reaper.cpp
index 0295137..4c67b37 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -61,7 +61,7 @@ void zmq::reaper_t::in_event ()
// Get the next command. If there is none, exit.
command_t cmd;
- int rc = mailbox.recv (&cmd, false);
+ int rc = mailbox.recv (&cmd, 0);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 2b1d8af..dc6b5f5 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -288,7 +288,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- int rc = process_commands (false, false);
+ int rc = process_commands (0, false);
if (rc != 0 && (errno == EINTR || errno == ETERM))
return -1;
errno_assert (rc == 0);
@@ -475,7 +475,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
}
// Process pending commands, if any.
- int rc = process_commands (false, true);
+ int rc = process_commands (0, true);
if (unlikely (rc != 0))
return -1;
@@ -487,20 +487,38 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
rc = xsend (msg_, flags_);
if (rc == 0)
return 0;
+ if (unlikely (errno != EAGAIN))
+ return -1;
// In case of non-blocking send we'll simply propagate
- // the error - including EAGAIN - upwards.
- if (flags_ & ZMQ_DONTWAIT)
+ // the error - including EAGAIN - up the stack.
+ if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0)
return -1;
+ // Compute the time when the timeout should occur.
+ // If the timeout is infite, don't care.
+ clock_t clock ;
+ int timeout = options.sndtimeo;
+ uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
+
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
- while (rc != 0) {
- if (errno != EAGAIN)
- return -1;
- if (unlikely (process_commands (true, false) != 0))
+ // If timeout is reached in the meantime, return EAGAIN.
+ while (true) {
+ if (unlikely (process_commands (timeout, false) != 0))
return -1;
rc = xsend (msg_, flags_);
+ if (rc == 0)
+ break;
+ if (unlikely (errno != EAGAIN))
+ return -1;
+ if (timeout > 0) {
+ timeout = end - clock.now_ms ();
+ if (timeout <= 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+ }
}
return 0;
}
@@ -521,7 +539,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// Get the message.
int rc = xrecv (msg_, flags_);
- int err = errno;
+ if (unlikely (rc != 0 && errno != EAGAIN))
+ return -1;
// Once every inbound_poll_rate messages check for signals and process
// incoming commands. This happens only if we are not polling altogether
@@ -532,7 +551,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing RDTSC all the time.
if (++ticks == inbound_poll_rate) {
- if (unlikely (process_commands (false, false) != 0))
+ if (unlikely (process_commands (0, false) != 0))
return -1;
ticks = 0;
}
@@ -545,17 +564,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
return 0;
}
- // If we don't have the message, restore the original cause of the problem.
- errno = err;
-
// If the message cannot be fetched immediately, there are two scenarios.
// For non-blocking recv, commands are processed in case there's an
// activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN.
- if (flags_ & ZMQ_DONTWAIT) {
- if (errno != EAGAIN)
- return -1;
- if (unlikely (process_commands (false, false) != 0))
+ if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
+ if (unlikely (process_commands (0, false) != 0))
return -1;
ticks = 0;
@@ -568,17 +582,33 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
return rc;
}
+ // Compute the time when the timeout should occur.
+ // If the timeout is infite, don't care.
+ clock_t clock ;
+ int timeout = options.rcvtimeo;
+ uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
+
// In blocking scenario, commands are processed over and over again until
// we are able to fetch a message.
bool block = (ticks != 0);
- while (rc != 0) {
- if (errno != EAGAIN)
- return -1;
- if (unlikely (process_commands (block, false) != 0))
+ while (true) {
+ if (unlikely (process_commands (block ? timeout : 0, false) != 0))
return -1;
rc = xrecv (msg_, flags_);
- ticks = 0;
+ if (rc == 0) {
+ ticks = 0;
+ break;
+ }
+ if (unlikely (errno != EAGAIN))
+ return -1;
block = true;
+ if (timeout > 0) {
+ timeout = end - clock.now_ms ();
+ if (timeout <= 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+ }
}
rcvmore = msg_->flags () & msg_t::more;
@@ -658,18 +688,20 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
check_destroy ();
}
-int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
+int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
{
int rc;
command_t cmd;
- if (block_) {
- rc = mailbox.recv (&cmd, true);
- if (rc == -1 && errno == EINTR)
- return -1;
- errno_assert (rc == 0);
+ if (timeout_ != 0) {
+
+ // If we are asked to wait, simply ask mailbox to wait.
+ rc = mailbox.recv (&cmd, timeout_);
}
else {
+ // If we are asked not to wait, check whether we haven't processed
+ // commands recently, so that we can throttle the new commands.
+
// Get the CPU's tick counter. If 0, the counter is not available.
uint64_t tsc = zmq::clock_t::rdtsc ();
@@ -690,7 +722,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
}
// Check whether there are any commands pending for this thread.
- rc = mailbox.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, 0);
}
// Process all the commands available at the moment.
@@ -701,7 +733,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
return -1;
errno_assert (rc == 0);
cmd.destination->process_command (cmd);
- rc = mailbox.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, 0);
}
if (ctx_terminated) {
@@ -797,9 +829,11 @@ void zmq::socket_base_t::xhiccuped (pipe_t *pipe_)
void zmq::socket_base_t::in_event ()
{
- // Process any commands from other threads/sockets that may be available
- // at the moment. Ultimately, socket will be destroyed.
- process_commands (false, false);
+ // This function is invoked only once the socket is running in the context
+ // of the reaper thread. Process any commands from other threads/sockets
+ // that may be available at the moment. Ultimately, the socket will
+ // be destroyed.
+ process_commands (0, false);
check_destroy ();
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index ed5620c..69a8aac 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -160,11 +160,11 @@ namespace zmq
// Register the pipe with this socket.
void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
- // Processes commands sent to this socket (if any). If 'block' is
- // set to true, returns only after at least one command was processed.
+ // Processes commands sent to this socket (if any). If timeout is -1,
+ // returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once
// in a predefined time period.
- int process_commands (bool block_, bool throttle_);
+ int process_commands (int timeout_, bool throttle_);
// Handlers for incoming commands.
void process_stop ();