summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBob Beaty <rbeaty@peak6.com>2010-12-09 21:42:58 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-12-09 21:42:58 +0100
commitfcfad5682ed7a7f5108853d2a7039aedfd9a9ac2 (patch)
tree08fa82d832d06899058a386a8a2dab263f64a5ba /src
parent1d81d2f1d4549c2cd0999c9544b059c29706f260 (diff)
Added Recovery Interval in Milliseconds
For very high-speed message systems, the memory used for recovery can get to be very large. The corrent limitation on that reduction is the ZMQ_RECOVERY_IVL of 1 sec. I added in an additional option ZMQ_RECOVERY_IVL_MSEC, which is the Recovery Interval in milliseconds. If used, this will override the previous one, and allow you to set a sub-second recovery interval. If not set, the default behavior is to use ZMQ_RECOVERY_IVL. Signed-off-by: Bob Beaty <rbeaty@peak6.com>
Diffstat (limited to 'src')
-rw-r--r--src/options.cpp18
-rw-r--r--src/options.hpp2
-rw-r--r--src/pgm_socket.cpp32
3 files changed, 38 insertions, 14 deletions
diff --git a/src/options.cpp b/src/options.cpp
index b4ca6b5..ae35059 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -30,6 +30,7 @@ zmq::options_t::options_t () :
affinity (0),
rate (100),
recovery_ivl (10),
+ recovery_ivl_msec (-1),
use_multicast_loop (true),
sndbuf (0),
rcvbuf (0),
@@ -101,6 +102,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
recovery_ivl = (uint32_t) *((int64_t*) optval_);
return 0;
+ case ZMQ_RECOVERY_IVL_MSEC:
+ if (optvallen_ != sizeof (int64_t) || *((int64_t*) optval_) < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ recovery_ivl_msec = (int32_t) *((int64_t*) optval_);
+ return 0;
+
case ZMQ_MCAST_LOOP:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
@@ -225,6 +234,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int64_t);
return 0;
+ case ZMQ_RECOVERY_IVL_MSEC:
+ if (*optvallen_ < sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int64_t*) optval_) = recovery_ivl_msec;
+ *optvallen_ = sizeof (int64_t);
+ return 0;
+
case ZMQ_MCAST_LOOP:
if (*optvallen_ < sizeof (int64_t)) {
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 2c6f65d..e6df505 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -44,6 +44,8 @@ namespace zmq
// Reliability time interval [s]. Default 10s.
uint32_t recovery_ivl;
+ // Reliability time interval [ms]. Default -1 = not used.
+ int32_t recovery_ivl_msec;
// Enable multicast loopback. Default disabled (false).
bool use_multicast_loop;
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 1c98edd..4c92f97 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -89,8 +89,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
errno = EINVAL;
return -1;
}
- // Recovery interval [s].
- if (options.recovery_ivl <= 0) {
+ // Recovery interval [s] or [ms] - based on the user's call
+ if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) {
errno = EINVAL;
return -1;
}
@@ -199,8 +199,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (receiver) {
const int recv_only = 1,
- rxw_max_rte = options.rate * 1000 / 8,
- rxw_secs = options.recovery_ivl,
+ rxw_max_tpdu = (int) pgm_max_tpdu,
+ rxw_sqns = (options.recovery_ivl_msec >= 0 ?
+ options.recovery_ivl_msec * options.rate /
+ (1000 * rxw_max_tpdu) :
+ options.recovery_ivl * options.rate /
+ rxw_max_tpdu),
peer_expiry = pgm_secs (300),
spmr_expiry = pgm_msecs (25),
nak_bo_ivl = pgm_msecs (50),
@@ -211,10 +215,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
sizeof (recv_only)) ||
- !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_MAX_RTE, &rxw_max_rte,
- sizeof (rxw_max_rte)) ||
- !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SECS, &rxw_secs,
- sizeof (rxw_secs)) ||
+ !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
+ sizeof (rxw_sqns)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
sizeof (peer_expiry)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
@@ -232,8 +234,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
goto err_abort;
} else {
const int send_only = 1,
- txw_max_rte = options.rate * 1000 / 8,
- txw_secs = options.recovery_ivl,
+ txw_max_tpdu = (int) pgm_max_tpdu,
+ txw_sqns = (options.recovery_ivl_msec >= 0 ?
+ options.recovery_ivl_msec * options.rate /
+ (1000 * txw_max_tpdu) :
+ options.recovery_ivl * options.rate /
+ txw_max_tpdu),
ambient_spm = pgm_secs (30),
heartbeat_spm[] = { pgm_msecs (100),
pgm_msecs (100),
@@ -247,10 +253,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
&send_only, sizeof (send_only)) ||
- !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_MAX_RTE,
- &txw_max_rte, sizeof (txw_max_rte)) ||
- !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SECS,
- &txw_secs, sizeof (txw_secs)) ||
+ !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
+ &txw_sqns, sizeof (txw_sqns)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
&ambient_spm, sizeof (ambient_spm)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,