summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-03-24 16:47:33 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-03-24 16:47:33 +0100
commitbc4a1ce3345f4e5904e4b13c618f90def21256a5 (patch)
tree0e95c952f0a5464f5edb515e7d16644e76515a85
parent507718ee1a56e376c06389c513de3868297fec35 (diff)
ZMQ_HWM split into ZMQ_SNDHWM and ZMQ_RCVHWM
These new options allow to control the maximum size of the inbound and outbound message pipe separately. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--doc/zmq_getsockopt.txt42
-rw-r--r--doc/zmq_setsockopt.txt36
-rw-r--r--include/zmq.h3
-rw-r--r--src/options.cpp30
-rw-r--r--src/options.hpp5
-rw-r--r--src/session.cpp6
-rw-r--r--src/socket_base.cpp21
-rw-r--r--tests/test_hwm.cpp4
8 files changed, 109 insertions, 38 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index e23eaac..c5dd968 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -57,12 +57,12 @@ Default value:: N/A
Applicable socket types:: all
-ZMQ_HWM: Retrieve high water mark
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-The 'ZMQ_HWM' option shall retrieve the high water mark for the specified
-'socket'. The high water mark is a hard limit on the maximum number of
-outstanding messages 0MQ shall queue in memory for any single peer that the
-specified 'socket' is communicating with.
+ZMQ_SNDHWM: Set high water mark for outbound messages
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_SNDHWM' option shall set the high water mark for outbound messages on
+the specified 'socket'. The high water mark is a hard limit on the maximum
+number of outstanding messages 0MQ shall queue in memory for any single peer
+that the specified 'socket' is communicating with.
If this limit has been reached the socket shall enter an exceptional state and
depending on the socket type, 0MQ shall take appropriate action such as
@@ -70,7 +70,29 @@ blocking or dropping sent messages. Refer to the individual socket descriptions
in linkzmq:zmq_socket[3] for details on the exact action taken for each socket
type.
-The default 'ZMQ_HWM' value of zero means "no limit".
+The default 'ZMQ_SNDHWM' value of zero means "no limit".
+
+[horizontal]
+Option value type:: int
+Option value unit:: messages
+Default value:: 0
+Applicable socket types:: all
+
+
+ZMQ_RCVHWM: Set high water mark for inbound messages
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_RCVHWM' option shall set the high water mark for inbound messages on
+the specified 'socket'. The high water mark is a hard limit on the maximum
+number of outstanding messages 0MQ shall queue in memory for any single peer
+that the specified 'socket' is communicating with.
+
+If this limit has been reached the socket shall enter an exceptional state and
+depending on the socket type, 0MQ shall take appropriate action such as
+blocking or dropping sent messages. Refer to the individual socket descriptions
+in linkzmq:zmq_socket[3] for details on the exact action taken for each socket
+type.
+
+The default 'ZMQ_RCVHWM' value of zero means "no limit".
[horizontal]
Option value type:: int
@@ -348,9 +370,9 @@ EXAMPLE
.Retrieving the high water mark
----
/* Retrieve high water mark into hwm */
-int hwm;
-size_t hwm_size = sizeof (hwm);
-rc = zmq_getsockopt (socket, ZMQ_HWM, &hwm, &hwm_size);
+int sndhwm;
+size_t sndhwm_size = sizeof (sndhwm);
+rc = zmq_getsockopt (socket, ZMQ_SNDHWM, &sndhwm, &sndhwm_size);
assert (rc == 0);
----
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index b16bab4..48afbfc 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -25,12 +25,12 @@ argument is the size of the option value in bytes.
The following socket options can be set with the _zmq_setsockopt()_ function:
-ZMQ_HWM: Set high water mark
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-The 'ZMQ_HWM' option shall set the high water mark for the specified 'socket'.
-The high water mark is a hard limit on the maximum number of outstanding
-messages 0MQ shall queue in memory for any single peer that the specified
-'socket' is communicating with.
+ZMQ_SNDHWM: Set high water mark for outbound messages
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_SNDHWM' option shall set the high water mark for outbound messages on
+the specified 'socket'. The high water mark is a hard limit on the maximum
+number of outstanding messages 0MQ shall queue in memory for any single peer
+that the specified 'socket' is communicating with.
If this limit has been reached the socket shall enter an exceptional state and
depending on the socket type, 0MQ shall take appropriate action such as
@@ -38,7 +38,29 @@ blocking or dropping sent messages. Refer to the individual socket descriptions
in linkzmq:zmq_socket[3] for details on the exact action taken for each socket
type.
-The default 'ZMQ_HWM' value of zero means "no limit".
+The default 'ZMQ_SNDHWM' value of zero means "no limit".
+
+[horizontal]
+Option value type:: int
+Option value unit:: messages
+Default value:: 0
+Applicable socket types:: all
+
+
+ZMQ_RCVHWM: Set high water mark for inbound messages
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_RCVHWM' option shall set the high water mark for inbound messages on
+the specified 'socket'. The high water mark is a hard limit on the maximum
+number of outstanding messages 0MQ shall queue in memory for any single peer
+that the specified 'socket' is communicating with.
+
+If this limit has been reached the socket shall enter an exceptional state and
+depending on the socket type, 0MQ shall take appropriate action such as
+blocking or dropping sent messages. Refer to the individual socket descriptions
+in linkzmq:zmq_socket[3] for details on the exact action taken for each socket
+type.
+
+The default 'ZMQ_RCVHWM' value of zero means "no limit".
[horizontal]
Option value type:: int
diff --git a/include/zmq.h b/include/zmq.h
index 27aeb29..4c37c03 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -183,7 +183,6 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_XSUB 10
/* Socket options. */
-#define ZMQ_HWM 1
#define ZMQ_AFFINITY 4
#define ZMQ_IDENTITY 5
#define ZMQ_SUBSCRIBE 6
@@ -201,6 +200,8 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_BACKLOG 19
#define ZMQ_RECONNECT_IVL_MAX 21
#define ZMQ_MAXMSGSIZE 22
+#define ZMQ_SNDHWM 23
+#define ZMQ_RCVHWM 24
/* Send/recv options. */
#define ZMQ_NOBLOCK 1
diff --git a/src/options.cpp b/src/options.cpp
index 39f8984..556ffd8 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -26,7 +26,8 @@
#include "err.hpp"
zmq::options_t::options_t () :
- hwm (0),
+ sndhwm (0),
+ rcvhwm (0),
affinity (0),
rate (100),
recovery_ivl (10000),
@@ -49,12 +50,20 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
{
switch (option_) {
- case ZMQ_HWM:
+ case ZMQ_SNDHWM:
if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
- hwm = *((int*) optval_);
+ sndhwm = *((int*) optval_);
+ return 0;
+
+ case ZMQ_RCVHWM:
+ if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ rcvhwm = *((int*) optval_);
return 0;
case ZMQ_AFFINITY:
@@ -168,13 +177,22 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
{
switch (option_) {
- case ZMQ_HWM:
+ case ZMQ_SNDHWM:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
- *((int*) optval_) = hwm;
- *optvallen_ = sizeof (uint64_t);
+ *((int*) optval_) = sndhwm;
+ *optvallen_ = sizeof (int);
+ return 0;
+
+ case ZMQ_RCVHWM:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvhwm;
+ *optvallen_ = sizeof (int);
return 0;
case ZMQ_AFFINITY:
diff --git a/src/options.hpp b/src/options.hpp
index 971e643..9ba06e3 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -35,8 +35,9 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
int getsockopt (int option_, void *optval_, size_t *optvallen_);
- // High-water mark for messages in pipe.
- int hwm;
+ // High-water marks for message pipes.
+ int sndhwm;
+ int rcvhwm;
uint64_t affinity;
blob_t identity;
diff --git a/src/session.cpp b/src/session.cpp
index 176f0ef..5f970cc 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -247,11 +247,13 @@ void zmq::session_t::process_attach (i_engine *engine_,
// Create the pipes, as required.
if (options.requires_in) {
- create_pipe (socket, this, options.hwm, &socket_reader, &out_pipe);
+ create_pipe (socket, this, options.rcvhwm, &socket_reader,
+ &out_pipe);
out_pipe->set_event_sink (this);
}
if (options.requires_out) {
- create_pipe (this, socket, options.hwm, &in_pipe, &socket_writer);
+ create_pipe (this, socket, options.sndhwm, &in_pipe,
+ &socket_writer);
in_pipe->set_event_sink (this);
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 374e342..bf873b6 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -377,20 +377,25 @@ int zmq::socket_base_t::connect (const char *addr_)
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
- int hwm;
- if (options.hwm == 0 || peer.options.hwm == 0)
- hwm = 0;
+ int sndhwm;
+ int rcvhwm;
+ if (options.sndhwm == 0 || peer.options.rcvhwm == 0)
+ sndhwm = 0;
else
- hwm = options.hwm + peer.options.hwm;
+ sndhwm = options.sndhwm + peer.options.rcvhwm;
+ if (options.rcvhwm == 0 || peer.options.sndhwm == 0)
+ rcvhwm = 0;
+ else
+ rcvhwm = options.rcvhwm + peer.options.sndhwm;
// Create inbound pipe, if required.
if (options.requires_in)
- create_pipe (this, peer.socket, hwm, &inpipe_reader,
+ create_pipe (this, peer.socket, rcvhwm, &inpipe_reader,
&inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out)
- create_pipe (peer.socket, this, hwm, &outpipe_reader,
+ create_pipe (peer.socket, this, sndhwm, &outpipe_reader,
&outpipe_writer);
// Attach the pipes to this socket object.
@@ -429,12 +434,12 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in)
- create_pipe (this, session, options.hwm,
+ create_pipe (this, session, options.rcvhwm,
&inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out)
- create_pipe (session, this, options.hwm,
+ create_pipe (session, this, options.sndhwm,
&outpipe_reader, &outpipe_writer);
// Attach the pipes to the socket object.
diff --git a/tests/test_hwm.cpp b/tests/test_hwm.cpp
index a96193d..9f3aac1 100644
--- a/tests/test_hwm.cpp
+++ b/tests/test_hwm.cpp
@@ -33,14 +33,14 @@ int main (int argc, char *argv [])
void *sb = zmq_socket (ctx, ZMQ_PULL);
assert (sb);
int hwm = 2;
- int rc = zmq_setsockopt (sb, ZMQ_HWM, &hwm, sizeof (hwm));
+ int rc = zmq_setsockopt (sb, ZMQ_RCVHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = zmq_bind (sb, "inproc://a");
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_PUSH);
assert (sc);
- rc = zmq_setsockopt (sc, ZMQ_HWM, &hwm, sizeof (hwm));
+ rc = zmq_setsockopt (sc, ZMQ_SNDHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = zmq_connect (sc, "inproc://a");
assert (rc == 0);