diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-03-24 16:47:33 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-03-24 16:47:33 +0100 |
commit | bc4a1ce3345f4e5904e4b13c618f90def21256a5 (patch) | |
tree | 0e95c952f0a5464f5edb515e7d16644e76515a85 | |
parent | 507718ee1a56e376c06389c513de3868297fec35 (diff) |
ZMQ_HWM split into ZMQ_SNDHWM and ZMQ_RCVHWM
These new options allow to control the maximum size of the
inbound and outbound message pipe separately.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | doc/zmq_getsockopt.txt | 42 | ||||
-rw-r--r-- | doc/zmq_setsockopt.txt | 36 | ||||
-rw-r--r-- | include/zmq.h | 3 | ||||
-rw-r--r-- | src/options.cpp | 30 | ||||
-rw-r--r-- | src/options.hpp | 5 | ||||
-rw-r--r-- | src/session.cpp | 6 | ||||
-rw-r--r-- | src/socket_base.cpp | 21 | ||||
-rw-r--r-- | tests/test_hwm.cpp | 4 |
8 files changed, 109 insertions, 38 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index e23eaac..c5dd968 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -57,12 +57,12 @@ Default value:: N/A Applicable socket types:: all -ZMQ_HWM: Retrieve high water mark -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_HWM' option shall retrieve the high water mark for the specified -'socket'. The high water mark is a hard limit on the maximum number of -outstanding messages 0MQ shall queue in memory for any single peer that the -specified 'socket' is communicating with. +ZMQ_SNDHWM: Set high water mark for outbound messages +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_SNDHWM' option shall set the high water mark for outbound messages on +the specified 'socket'. The high water mark is a hard limit on the maximum +number of outstanding messages 0MQ shall queue in memory for any single peer +that the specified 'socket' is communicating with. If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, 0MQ shall take appropriate action such as @@ -70,7 +70,29 @@ blocking or dropping sent messages. Refer to the individual socket descriptions in linkzmq:zmq_socket[3] for details on the exact action taken for each socket type. -The default 'ZMQ_HWM' value of zero means "no limit". +The default 'ZMQ_SNDHWM' value of zero means "no limit". + +[horizontal] +Option value type:: int +Option value unit:: messages +Default value:: 0 +Applicable socket types:: all + + +ZMQ_RCVHWM: Set high water mark for inbound messages +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_RCVHWM' option shall set the high water mark for inbound messages on +the specified 'socket'. The high water mark is a hard limit on the maximum +number of outstanding messages 0MQ shall queue in memory for any single peer +that the specified 'socket' is communicating with. + +If this limit has been reached the socket shall enter an exceptional state and +depending on the socket type, 0MQ shall take appropriate action such as +blocking or dropping sent messages. Refer to the individual socket descriptions +in linkzmq:zmq_socket[3] for details on the exact action taken for each socket +type. + +The default 'ZMQ_RCVHWM' value of zero means "no limit". [horizontal] Option value type:: int @@ -348,9 +370,9 @@ EXAMPLE .Retrieving the high water mark ---- /* Retrieve high water mark into hwm */ -int hwm; -size_t hwm_size = sizeof (hwm); -rc = zmq_getsockopt (socket, ZMQ_HWM, &hwm, &hwm_size); +int sndhwm; +size_t sndhwm_size = sizeof (sndhwm); +rc = zmq_getsockopt (socket, ZMQ_SNDHWM, &sndhwm, &sndhwm_size); assert (rc == 0); ---- diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index b16bab4..48afbfc 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -25,12 +25,12 @@ argument is the size of the option value in bytes. The following socket options can be set with the _zmq_setsockopt()_ function: -ZMQ_HWM: Set high water mark -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_HWM' option shall set the high water mark for the specified 'socket'. -The high water mark is a hard limit on the maximum number of outstanding -messages 0MQ shall queue in memory for any single peer that the specified -'socket' is communicating with. +ZMQ_SNDHWM: Set high water mark for outbound messages +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_SNDHWM' option shall set the high water mark for outbound messages on +the specified 'socket'. The high water mark is a hard limit on the maximum +number of outstanding messages 0MQ shall queue in memory for any single peer +that the specified 'socket' is communicating with. If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, 0MQ shall take appropriate action such as @@ -38,7 +38,29 @@ blocking or dropping sent messages. Refer to the individual socket descriptions in linkzmq:zmq_socket[3] for details on the exact action taken for each socket type. -The default 'ZMQ_HWM' value of zero means "no limit". +The default 'ZMQ_SNDHWM' value of zero means "no limit". + +[horizontal] +Option value type:: int +Option value unit:: messages +Default value:: 0 +Applicable socket types:: all + + +ZMQ_RCVHWM: Set high water mark for inbound messages +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_RCVHWM' option shall set the high water mark for inbound messages on +the specified 'socket'. The high water mark is a hard limit on the maximum +number of outstanding messages 0MQ shall queue in memory for any single peer +that the specified 'socket' is communicating with. + +If this limit has been reached the socket shall enter an exceptional state and +depending on the socket type, 0MQ shall take appropriate action such as +blocking or dropping sent messages. Refer to the individual socket descriptions +in linkzmq:zmq_socket[3] for details on the exact action taken for each socket +type. + +The default 'ZMQ_RCVHWM' value of zero means "no limit". [horizontal] Option value type:: int diff --git a/include/zmq.h b/include/zmq.h index 27aeb29..4c37c03 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -183,7 +183,6 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_XSUB 10 /* Socket options. */ -#define ZMQ_HWM 1 #define ZMQ_AFFINITY 4 #define ZMQ_IDENTITY 5 #define ZMQ_SUBSCRIBE 6 @@ -201,6 +200,8 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_BACKLOG 19 #define ZMQ_RECONNECT_IVL_MAX 21 #define ZMQ_MAXMSGSIZE 22 +#define ZMQ_SNDHWM 23 +#define ZMQ_RCVHWM 24 /* Send/recv options. */ #define ZMQ_NOBLOCK 1 diff --git a/src/options.cpp b/src/options.cpp index 39f8984..556ffd8 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -26,7 +26,8 @@ #include "err.hpp" zmq::options_t::options_t () : - hwm (0), + sndhwm (0), + rcvhwm (0), affinity (0), rate (100), recovery_ivl (10000), @@ -49,12 +50,20 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, { switch (option_) { - case ZMQ_HWM: + case ZMQ_SNDHWM: if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) { errno = EINVAL; return -1; } - hwm = *((int*) optval_); + sndhwm = *((int*) optval_); + return 0; + + case ZMQ_RCVHWM: + if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) { + errno = EINVAL; + return -1; + } + rcvhwm = *((int*) optval_); return 0; case ZMQ_AFFINITY: @@ -168,13 +177,22 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { switch (option_) { - case ZMQ_HWM: + case ZMQ_SNDHWM: if (*optvallen_ < sizeof (int)) { errno = EINVAL; return -1; } - *((int*) optval_) = hwm; - *optvallen_ = sizeof (uint64_t); + *((int*) optval_) = sndhwm; + *optvallen_ = sizeof (int); + return 0; + + case ZMQ_RCVHWM: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = rcvhwm; + *optvallen_ = sizeof (int); return 0; case ZMQ_AFFINITY: diff --git a/src/options.hpp b/src/options.hpp index 971e643..9ba06e3 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -35,8 +35,9 @@ namespace zmq int setsockopt (int option_, const void *optval_, size_t optvallen_); int getsockopt (int option_, void *optval_, size_t *optvallen_); - // High-water mark for messages in pipe. - int hwm; + // High-water marks for message pipes. + int sndhwm; + int rcvhwm; uint64_t affinity; blob_t identity; diff --git a/src/session.cpp b/src/session.cpp index 176f0ef..5f970cc 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -247,11 +247,13 @@ void zmq::session_t::process_attach (i_engine *engine_, // Create the pipes, as required. if (options.requires_in) { - create_pipe (socket, this, options.hwm, &socket_reader, &out_pipe); + create_pipe (socket, this, options.rcvhwm, &socket_reader, + &out_pipe); out_pipe->set_event_sink (this); } if (options.requires_out) { - create_pipe (this, socket, options.hwm, &in_pipe, &socket_writer); + create_pipe (this, socket, options.sndhwm, &in_pipe, + &socket_writer); in_pipe->set_event_sink (this); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 374e342..bf873b6 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -377,20 +377,25 @@ int zmq::socket_base_t::connect (const char *addr_) // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. - int hwm; - if (options.hwm == 0 || peer.options.hwm == 0) - hwm = 0; + int sndhwm; + int rcvhwm; + if (options.sndhwm == 0 || peer.options.rcvhwm == 0) + sndhwm = 0; else - hwm = options.hwm + peer.options.hwm; + sndhwm = options.sndhwm + peer.options.rcvhwm; + if (options.rcvhwm == 0 || peer.options.sndhwm == 0) + rcvhwm = 0; + else + rcvhwm = options.rcvhwm + peer.options.sndhwm; // Create inbound pipe, if required. if (options.requires_in) - create_pipe (this, peer.socket, hwm, &inpipe_reader, + create_pipe (this, peer.socket, rcvhwm, &inpipe_reader, &inpipe_writer); // Create outbound pipe, if required. if (options.requires_out) - create_pipe (peer.socket, this, hwm, &outpipe_reader, + create_pipe (peer.socket, this, sndhwm, &outpipe_reader, &outpipe_writer); // Attach the pipes to this socket object. @@ -429,12 +434,12 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) - create_pipe (this, session, options.hwm, + create_pipe (this, session, options.rcvhwm, &inpipe_reader, &inpipe_writer); // Create outbound pipe, if required. if (options.requires_out) - create_pipe (session, this, options.hwm, + create_pipe (session, this, options.sndhwm, &outpipe_reader, &outpipe_writer); // Attach the pipes to the socket object. diff --git a/tests/test_hwm.cpp b/tests/test_hwm.cpp index a96193d..9f3aac1 100644 --- a/tests/test_hwm.cpp +++ b/tests/test_hwm.cpp @@ -33,14 +33,14 @@ int main (int argc, char *argv []) void *sb = zmq_socket (ctx, ZMQ_PULL); assert (sb); int hwm = 2; - int rc = zmq_setsockopt (sb, ZMQ_HWM, &hwm, sizeof (hwm)); + int rc = zmq_setsockopt (sb, ZMQ_RCVHWM, &hwm, sizeof (hwm)); assert (rc == 0); rc = zmq_bind (sb, "inproc://a"); assert (rc == 0); void *sc = zmq_socket (ctx, ZMQ_PUSH); assert (sc); - rc = zmq_setsockopt (sc, ZMQ_HWM, &hwm, sizeof (hwm)); + rc = zmq_setsockopt (sc, ZMQ_SNDHWM, &hwm, sizeof (hwm)); assert (rc == 0); rc = zmq_connect (sc, "inproc://a"); assert (rc == 0); |