diff options
author | Fabien Ninoles <fabien@tzone.org> | 2011-06-17 12:22:02 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-06-17 12:22:02 +0200 |
commit | d7923f08cab62ef40027a92f596ff45428870838 (patch) | |
tree | 370ad14bc9d1ebbc14f9d5f8077f81e28e301f5f | |
parent | 65d2b70312efb148814b58d9cd38cc7069b53a3b (diff) |
Add sockopt ZMQ_RCVTIMEO/ZMQ_SNDTIMEO.
- Add doc and tests
- Add options and setup
- Wait using poll/select
Signed-off-by: Fabien Ninoles <fabien@tzone.org>
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | doc/zmq_getsockopt.txt | 35 | ||||
-rw-r--r-- | doc/zmq_setsockopt.txt | 32 | ||||
-rw-r--r-- | include/zmq.h | 2 | ||||
-rw-r--r-- | perf/remote_thr.cpp | 2 | ||||
-rw-r--r-- | src/ctx.cpp | 2 | ||||
-rw-r--r-- | src/io_thread.cpp | 2 | ||||
-rw-r--r-- | src/mailbox.cpp | 106 | ||||
-rw-r--r-- | src/mailbox.hpp | 7 | ||||
-rw-r--r-- | src/options.cpp | 36 | ||||
-rw-r--r-- | src/options.hpp | 4 | ||||
-rw-r--r-- | src/reaper.cpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 100 | ||||
-rw-r--r-- | src/socket_base.hpp | 6 | ||||
-rw-r--r-- | tests/Makefile.am | 5 | ||||
-rw-r--r-- | tests/test_timeo.cpp | 115 |
16 files changed, 407 insertions, 50 deletions
@@ -28,6 +28,7 @@ tests/test_reqrep_ipc tests/test_reqrep_tcp tests/test_shutdown_stress tests/test_hwm +tests/test_timeo src/platform.hpp* src/stamp-h1 devices/zmq_forwarder/zmq_forwarder diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index b48b06b..270ea6f 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -255,7 +255,8 @@ interval for the specified 'socket'. This is the maximum period 0MQ shall wait between attempts to reconnect. On each reconnect attempt, the previous interval shall be doubled untill ZMQ_RECONNECT_IVL_MAX is reached. This allows for exponential backoff strategy. Default value means no exponential backoff is -performed and reconnect interval calculations are only based on ZMQ_RECONNECT_IVL. +performed and reconnect interval calculations are only based on +ZMQ_RECONNECT_IVL. NOTE: Values less than ZMQ_RECONNECT_IVL will be ignored. @@ -324,6 +325,38 @@ Default value:: 1 Applicable socket types:: ZMQ_SUB, ZMQ_XSUB +ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Retrieve the timeout for recv operation on the socket. If the value is `0`, +_zmq_recv(3)_ will return immediately, with a EAGAIN error if there is no +message to receive. If the value is `-1`, it will block until a message is +available. For all other values, it will wait for a message for that amount +of time before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + + +ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Retrieve the timeout for send operation on the socket. If the value is `0`, +_zmq_send(3)_ will return immediately, with a EAGAIN error if the message +cannot be sent. If the value is `-1`, it will block until the message is sent. +For all other values, it will try to send the message for that amount of time +before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + + ZMQ_FD: Retrieve file descriptor associated with the socket ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_FD' option shall retrieve the file descriptor associated with the diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 4b639c5..0093085 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -328,6 +328,38 @@ Default value:: 1 Applicable socket types:: ZMQ_SUB, ZMQ_XSUB +ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sets the timeout for receive operation on the socket. If the value is `0`, +_zmq_recv(3)_ will return immediately, with a EAGAIN error if there is no +message to receive. If the value is `-1`, it will block until a message is +available. For all other values, it will wait for a message for that amount +of time before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + + +ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sets the timeout for send operation on the socket. If the value is `0`, +_zmq_send(3)_ will return immediately, with a EAGAIN error if the message +cannot be sent. If the value is `-1`, it will block until the message is sent. +For all other values, it will try to send the message for that amount of time +before returning with an EAGAIN error. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: -1 (infinite) +Applicable socket types:: all + + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it diff --git a/include/zmq.h b/include/zmq.h index 8d1d57b..ca34e58 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -181,6 +181,8 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_RCVHWM 24 #define ZMQ_MULTICAST_HOPS 25 #define ZMQ_FILTER 26 +#define ZMQ_RCVTIMEO 27 +#define ZMQ_SNDTIMEO 28 /* Send/recv options. */ #define ZMQ_DONTWAIT 1 diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp index ba36b98..363ae7c 100644 --- a/perf/remote_thr.cpp +++ b/perf/remote_thr.cpp @@ -88,6 +88,8 @@ int main (int argc, char *argv []) } } +zmq_sleep (2); + rc = zmq_close (s); if (rc != 0) { printf ("error in zmq_close: %s\n", zmq_strerror (errno)); diff --git a/src/ctx.cpp b/src/ctx.cpp index fb5420d..783bcba 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -142,7 +142,7 @@ int zmq::ctx_t::terminate () // Wait till reaper thread closes all the sockets. command_t cmd; - int rc = term_mailbox.recv (&cmd, true); + int rc = term_mailbox.recv (&cmd, -1); if (rc == -1 && errno == EINTR) return -1; zmq_assert (rc == 0); diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 9678392..c6f3880 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -70,7 +70,7 @@ void zmq::io_thread_t::in_event () // Get the next command. If there is none, exit. command_t cmd; - int rc = mailbox.recv (&cmd, false); + int rc = mailbox.recv (&cmd, 0); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) diff --git a/src/mailbox.cpp b/src/mailbox.cpp index 221396b..402d025 100644 --- a/src/mailbox.cpp +++ b/src/mailbox.cpp @@ -18,8 +18,33 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "mailbox.hpp" #include "platform.hpp" + +#if defined ZMQ_FORCE_SELECT +#define ZMQ_RCVTIMEO_BASED_ON_SELECT +#elif defined ZMQ_FORCE_POLL +#define ZMQ_RCVTIMEO_BASED_ON_POLL +#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ + defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ + defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ + defined ZMQ_HAVE_NETBSD +#define ZMQ_RCVTIMEO_BASED_ON_POLL +#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS +#define ZMQ_RCVTIMEO_BASED_ON_SELECT +#endif + +// On AIX, poll.h has to be included before zmq.h to get consistent +// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' +// instead of 'events' and 'revents' and defines macros to map from POSIX-y +// names to AIX-specific names). +#if defined ZMQ_RCVTIMEO_BASED_ON_POLL +#include <poll.h> +#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT +#include <sys/select.h> +#endif + +#include "mailbox.hpp" #include "err.hpp" #include "fd.hpp" #include "ip.hpp" @@ -79,10 +104,14 @@ void zmq::mailbox_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::mailbox_t::recv (command_t *cmd_, bool block_) +int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) { + // If there's a finite timeout, poll on the fd. + if (timeout_ > 0) + return recv_timeout (cmd_, timeout_); + // If required, set the reader to blocking mode. - if (block_) { + if (timeout_ < 0) { unsigned long argp = 0; int rc = ioctlsocket (r, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); @@ -97,7 +126,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_) err = EAGAIN; // Re-set the reader to non-blocking mode. - if (block_) { + if (timeout_ < 0) { unsigned long argp = 1; int rc = ioctlsocket (r, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); @@ -194,20 +223,24 @@ void zmq::mailbox_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::mailbox_t::recv (command_t *cmd_, bool block_) +int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) { + // If there's a finite timeout, poll on the fd. + if (timeout_ > 0) + return recv_timeout (cmd_, timeout_); + #ifdef MSG_DONTWAIT // Attempt to read an entire command. Returns EAGAIN if non-blocking // mode is requested and a command is not available. ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), - block_ ? 0 : MSG_DONTWAIT); + timeout_ < 0 ? 0 : MSG_DONTWAIT); if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) return -1; #else // If required, set the reader to blocking mode. - if (block_) { + if (timeout_ < 0) { int flags = fcntl (r, F_GETFL, 0); errno_assert (flags >= 0); int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); @@ -223,7 +256,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_) err = errno; // Re-set the reader to non-blocking mode. - if (block_) { + if (timeout_ < 0) { int flags = fcntl (r, F_GETFL, 0); errno_assert (flags >= 0); int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); @@ -380,3 +413,60 @@ int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_) #endif } +int zmq::mailbox_t::recv_timeout (command_t *cmd_, int timeout_) +{ +#ifdef ZMQ_RCVTIMEO_BASED_ON_POLL + + struct pollfd pfd; + pfd.fd = r; + pfd.events = POLLIN; + int rc = poll (&pfd, 1, timeout_); + if (unlikely (rc < 0)) { + zmq_assert (errno == EINTR); + return -1; + } + else if (unlikely (rc == 0)) { + errno = EAGAIN; + return -1; + } + zmq_assert (rc == 1); + zmq_assert (pfd.revents & POLLIN); + +#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT + + fd_set fds; + FD_ZERO (&fds); + FD_SET (r, &fds); + struct timeval timeout; + timeout.tv_sec = timeout_ / 1000; + timeout.tv_usec = timeout_ % 1000 * 1000; + int rc = select (r + 1, &fds, NULL, NULL, &timeout); + if (unlikely (rc < 0)) { + zmq_assert (errno == EINTR); + return -1; + } + else if (unlikely (rc == 0)) { + errno = EAGAIN; + return -1; + } + zmq_assert (rc == 1); + +#else +#error +#endif + + // The file descriptor is ready for reading. Extract one command out of it. + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); + if (unlikely (rc < 0 && errno == EINTR)) + return -1; + zmq_assert (nbytes == sizeof (command_t)); + return 0; +} + +#if defined ZMQ_RCVTIMEO_BASED_ON_SELECT +#undef ZMQ_RCVTIMEO_BASED_ON_SELECT +#endif +#if defined ZMQ_RCVTIMEO_BASED_ON_POLL +#undef ZMQ_RCVTIMEO_BASED_ON_POLL +#endif + diff --git a/src/mailbox.hpp b/src/mailbox.hpp index 96bf4eb..1b54aac 100644 --- a/src/mailbox.hpp +++ b/src/mailbox.hpp @@ -41,7 +41,7 @@ namespace zmq fd_t get_fd (); void send (const command_t &cmd_); - int recv (command_t *cmd_, bool block_); + int recv (command_t *cmd_, int timeout_); private: @@ -52,6 +52,11 @@ namespace zmq // Platform-dependent function to create a socketpair. static int make_socketpair (fd_t *r_, fd_t *w_); + // Receives a command with the specific timeout. + // This function is not to be used for non-blocking or inifinitely + // blocking recvs. + int recv_timeout (command_t *cmd_, int timeout_); + // Disable copying of mailbox_t object. mailbox_t (const mailbox_t&); const mailbox_t &operator = (const mailbox_t&); diff --git a/src/options.cpp b/src/options.cpp index 29cf023..80ab294 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -39,6 +39,8 @@ zmq::options_t::options_t () : backlog (100), maxmsgsize (-1), filter (1), + rcvtimeo (-1), + sndtimeo (-1), immediate_connect (true) { } @@ -182,6 +184,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, filter = *((int*) optval_); return 0; + case ZMQ_RCVTIMEO: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + rcvtimeo = *((int*) optval_); + return 0; + + case ZMQ_SNDTIMEO: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + sndtimeo = *((int*) optval_); + return 0; + } errno = EINVAL; @@ -336,6 +354,24 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case ZMQ_RCVTIMEO: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = rcvtimeo; + *optvallen_ = sizeof (int); + return 0; + + case ZMQ_SNDTIMEO: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = sndtimeo; + *optvallen_ = sizeof (int); + return 0; + } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index e055919..858ec2e 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -78,6 +78,10 @@ namespace zmq // If 1, (X)SUB socket should filter the messages. If 0, it should not. int filter; + // The timeout for send/recv operations for this socket. + int rcvtimeo; + int sndtimeo; + // If true, when connecting, pipes are created immediately without // waiting for the connection to be established. That way the socket // is not aware of the peer's identity, however, it is able to send diff --git a/src/reaper.cpp b/src/reaper.cpp index 0295137..4c67b37 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -61,7 +61,7 @@ void zmq::reaper_t::in_event () // Get the next command. If there is none, exit. command_t cmd; - int rc = mailbox.recv (&cmd, false); + int rc = mailbox.recv (&cmd, 0); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2b1d8af..dc6b5f5 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -288,7 +288,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - int rc = process_commands (false, false); + int rc = process_commands (0, false); if (rc != 0 && (errno == EINTR || errno == ETERM)) return -1; errno_assert (rc == 0); @@ -475,7 +475,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) } // Process pending commands, if any. - int rc = process_commands (false, true); + int rc = process_commands (0, true); if (unlikely (rc != 0)) return -1; @@ -487,20 +487,38 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) rc = xsend (msg_, flags_); if (rc == 0) return 0; + if (unlikely (errno != EAGAIN)) + return -1; // In case of non-blocking send we'll simply propagate - // the error - including EAGAIN - upwards. - if (flags_ & ZMQ_DONTWAIT) + // the error - including EAGAIN - up the stack. + if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) return -1; + // Compute the time when the timeout should occur. + // If the timeout is infite, don't care. + clock_t clock ; + int timeout = options.sndtimeo; + uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout); + // Oops, we couldn't send the message. Wait for the next // command, process it and try to send the message again. - while (rc != 0) { - if (errno != EAGAIN) - return -1; - if (unlikely (process_commands (true, false) != 0)) + // If timeout is reached in the meantime, return EAGAIN. + while (true) { + if (unlikely (process_commands (timeout, false) != 0)) return -1; rc = xsend (msg_, flags_); + if (rc == 0) + break; + if (unlikely (errno != EAGAIN)) + return -1; + if (timeout > 0) { + timeout = end - clock.now_ms (); + if (timeout <= 0) { + errno = EAGAIN; + return -1; + } + } } return 0; } @@ -521,7 +539,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // Get the message. int rc = xrecv (msg_, flags_); - int err = errno; + if (unlikely (rc != 0 && errno != EAGAIN)) + return -1; // Once every inbound_poll_rate messages check for signals and process // incoming commands. This happens only if we are not polling altogether @@ -532,7 +551,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // described above) from the one used by 'send'. This is because counting // ticks is more efficient than doing RDTSC all the time. if (++ticks == inbound_poll_rate) { - if (unlikely (process_commands (false, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; } @@ -545,17 +564,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) return 0; } - // If we don't have the message, restore the original cause of the problem. - errno = err; - // If the message cannot be fetched immediately, there are two scenarios. // For non-blocking recv, commands are processed in case there's an // activate_reader command already waiting int a command pipe. // If it's not, return EAGAIN. - if (flags_ & ZMQ_DONTWAIT) { - if (errno != EAGAIN) - return -1; - if (unlikely (process_commands (false, false) != 0)) + if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) { + if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; @@ -568,17 +582,33 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) return rc; } + // Compute the time when the timeout should occur. + // If the timeout is infite, don't care. + clock_t clock ; + int timeout = options.rcvtimeo; + uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout); + // In blocking scenario, commands are processed over and over again until // we are able to fetch a message. bool block = (ticks != 0); - while (rc != 0) { - if (errno != EAGAIN) - return -1; - if (unlikely (process_commands (block, false) != 0)) + while (true) { + if (unlikely (process_commands (block ? timeout : 0, false) != 0)) return -1; rc = xrecv (msg_, flags_); - ticks = 0; + if (rc == 0) { + ticks = 0; + break; + } + if (unlikely (errno != EAGAIN)) + return -1; block = true; + if (timeout > 0) { + timeout = end - clock.now_ms (); + if (timeout <= 0) { + errno = EAGAIN; + return -1; + } + } } rcvmore = msg_->flags () & msg_t::more; @@ -658,18 +688,20 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) check_destroy (); } -int zmq::socket_base_t::process_commands (bool block_, bool throttle_) +int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) { int rc; command_t cmd; - if (block_) { - rc = mailbox.recv (&cmd, true); - if (rc == -1 && errno == EINTR) - return -1; - errno_assert (rc == 0); + if (timeout_ != 0) { + + // If we are asked to wait, simply ask mailbox to wait. + rc = mailbox.recv (&cmd, timeout_); } else { + // If we are asked not to wait, check whether we haven't processed + // commands recently, so that we can throttle the new commands. + // Get the CPU's tick counter. If 0, the counter is not available. uint64_t tsc = zmq::clock_t::rdtsc (); @@ -690,7 +722,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = mailbox.recv (&cmd, false); + rc = mailbox.recv (&cmd, 0); } // Process all the commands available at the moment. @@ -701,7 +733,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) return -1; errno_assert (rc == 0); cmd.destination->process_command (cmd); - rc = mailbox.recv (&cmd, false); + rc = mailbox.recv (&cmd, 0); } if (ctx_terminated) { @@ -797,9 +829,11 @@ void zmq::socket_base_t::xhiccuped (pipe_t *pipe_) void zmq::socket_base_t::in_event () { - // Process any commands from other threads/sockets that may be available - // at the moment. Ultimately, socket will be destroyed. - process_commands (false, false); + // This function is invoked only once the socket is running in the context + // of the reaper thread. Process any commands from other threads/sockets + // that may be available at the moment. Ultimately, the socket will + // be destroyed. + process_commands (0, false); check_destroy (); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index ed5620c..69a8aac 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -160,11 +160,11 @@ namespace zmq // Register the pipe with this socket. void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); - // Processes commands sent to this socket (if any). If 'block' is - // set to true, returns only after at least one command was processed. + // Processes commands sent to this socket (if any). If timeout is -1, + // returns only after at least one command was processed. // If throttle argument is true, commands are processed at most once // in a predefined time period. - int process_commands (bool block_, bool throttle_); + int process_commands (int timeout_, bool throttle_); // Handlers for incoming commands. void process_stop (); diff --git a/tests/Makefile.am b/tests/Makefile.am index ebbc46c..b26cccc 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -5,7 +5,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_pair_tcp \ test_reqrep_inproc \ test_reqrep_tcp \ - test_hwm + test_hwm \ + test_timeo if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -21,6 +22,8 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp test_hwm_SOURCES = test_hwm.cpp +test_timeo_SOURCES = test_timeo.cpp + if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_timeo.cpp b/tests/test_timeo.cpp new file mode 100644 index 0000000..a8a3fc0 --- /dev/null +++ b/tests/test_timeo.cpp @@ -0,0 +1,115 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <assert.h> +#include <string.h> +#include <pthread.h> + +#include "../include/zmq.h" +#include "../include/zmq_utils.h" + +extern "C" +{ + void *worker(void *ctx) + { + // Worker thread connects after delay of 1 second. Then it waits + // for 1 more second, so that async connect has time to succeed. + zmq_sleep (1); + void *sc = zmq_socket (ctx, ZMQ_PUSH); + assert (sc); + int rc = zmq_connect (sc, "inproc://timeout_test"); + assert (rc == 0); + zmq_sleep (1); + rc = zmq_close (sc); + assert (rc == 0); + return NULL; + } +} + +int main (int argc, char *argv []) +{ + void *ctx = zmq_init (1); + assert (ctx); + + // Create a disconnected socket. + void *sb = zmq_socket (ctx, ZMQ_PULL); + assert (sb); + int rc = zmq_bind (sb, "inproc://timeout_test"); + assert (rc == 0); + + // Check whether non-blocking recv returns immediately. + char buf [] = "12345678ABCDEFGH12345678abcdefgh"; + rc = zmq_recv (sb, buf, 32, ZMQ_DONTWAIT); + assert (rc == -1); + assert (zmq_errno() == EAGAIN); + + // Check whether recv timeout is honoured. + int timeout = 500; + size_t timeout_size = sizeof timeout; + rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size); + assert (rc == 0); + void *watch = zmq_stopwatch_start (); + rc = zmq_recv (sb, buf, 32, 0); + assert (rc == -1); + assert (zmq_errno () == EAGAIN); + unsigned long elapsed = zmq_stopwatch_stop (watch); + assert (elapsed > 440000 && elapsed < 550000); + + // Check whether connection during the wait doesn't distort the timeout. + timeout = 2000; + rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size); + assert (rc == 0); + pthread_t thread; + rc = pthread_create (&thread, NULL, worker, ctx); + assert (rc == 0); + watch = zmq_stopwatch_start (); + rc = zmq_recv (sb, buf, 32, 0); + assert (rc == -1); + assert (zmq_errno () == EAGAIN); + elapsed = zmq_stopwatch_stop (watch); + assert (elapsed > 1900000 && elapsed < 2100000); + rc = pthread_join (thread, NULL); + assert (rc == 0); + + // Check that timeouts don't break normal message transfer. + void *sc = zmq_socket (ctx, ZMQ_PUSH); + assert (sc); + rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size); + assert (rc == 0); + rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, timeout_size); + assert (rc == 0); + rc = zmq_connect (sc, "inproc://timeout_test"); + assert (rc == 0); + rc = zmq_send (sc, buf, 32, 0); + assert (rc == 32); + rc = zmq_recv (sb, buf, 32, 0); + assert (rc == 32); + + // Clean-up. + rc = zmq_close (sc); + assert (rc == 0); + rc = zmq_close (sb); + assert (rc == 0); + rc = zmq_term (ctx); + assert (rc == 0); + + return 0 ; +} + |