From fcfad5682ed7a7f5108853d2a7039aedfd9a9ac2 Mon Sep 17 00:00:00 2001 From: Bob Beaty Date: Thu, 9 Dec 2010 21:42:58 +0100 Subject: Added Recovery Interval in Milliseconds For very high-speed message systems, the memory used for recovery can get to be very large. The corrent limitation on that reduction is the ZMQ_RECOVERY_IVL of 1 sec. I added in an additional option ZMQ_RECOVERY_IVL_MSEC, which is the Recovery Interval in milliseconds. If used, this will override the previous one, and allow you to set a sub-second recovery interval. If not set, the default behavior is to use ZMQ_RECOVERY_IVL. Signed-off-by: Bob Beaty --- src/options.cpp | 18 ++++++++++++++++++ src/options.hpp | 2 ++ src/pgm_socket.cpp | 32 ++++++++++++++++++-------------- 3 files changed, 38 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/options.cpp b/src/options.cpp index b4ca6b5..ae35059 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -30,6 +30,7 @@ zmq::options_t::options_t () : affinity (0), rate (100), recovery_ivl (10), + recovery_ivl_msec (-1), use_multicast_loop (true), sndbuf (0), rcvbuf (0), @@ -101,6 +102,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, recovery_ivl = (uint32_t) *((int64_t*) optval_); return 0; + case ZMQ_RECOVERY_IVL_MSEC: + if (optvallen_ != sizeof (int64_t) || *((int64_t*) optval_) < 0) { + errno = EINVAL; + return -1; + } + recovery_ivl_msec = (int32_t) *((int64_t*) optval_); + return 0; + case ZMQ_MCAST_LOOP: if (optvallen_ != sizeof (int64_t)) { errno = EINVAL; @@ -225,6 +234,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int64_t); return 0; + case ZMQ_RECOVERY_IVL_MSEC: + if (*optvallen_ < sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + *((int64_t*) optval_) = recovery_ivl_msec; + *optvallen_ = sizeof (int64_t); + return 0; + case ZMQ_MCAST_LOOP: if (*optvallen_ < sizeof (int64_t)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 2c6f65d..e6df505 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -44,6 +44,8 @@ namespace zmq // Reliability time interval [s]. Default 10s. uint32_t recovery_ivl; + // Reliability time interval [ms]. Default -1 = not used. + int32_t recovery_ivl_msec; // Enable multicast loopback. Default disabled (false). bool use_multicast_loop; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1c98edd..4c92f97 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -89,8 +89,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) errno = EINVAL; return -1; } - // Recovery interval [s]. - if (options.recovery_ivl <= 0) { + // Recovery interval [s] or [ms] - based on the user's call + if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) { errno = EINVAL; return -1; } @@ -199,8 +199,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (receiver) { const int recv_only = 1, - rxw_max_rte = options.rate * 1000 / 8, - rxw_secs = options.recovery_ivl, + rxw_max_tpdu = (int) pgm_max_tpdu, + rxw_sqns = (options.recovery_ivl_msec >= 0 ? + options.recovery_ivl_msec * options.rate / + (1000 * rxw_max_tpdu) : + options.recovery_ivl * options.rate / + rxw_max_tpdu), peer_expiry = pgm_secs (300), spmr_expiry = pgm_msecs (25), nak_bo_ivl = pgm_msecs (50), @@ -211,10 +215,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, sizeof (recv_only)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_MAX_RTE, &rxw_max_rte, - sizeof (rxw_max_rte)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SECS, &rxw_secs, - sizeof (rxw_secs)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns, + sizeof (rxw_sqns)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof (peer_expiry)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, @@ -232,8 +234,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) goto err_abort; } else { const int send_only = 1, - txw_max_rte = options.rate * 1000 / 8, - txw_secs = options.recovery_ivl, + txw_max_tpdu = (int) pgm_max_tpdu, + txw_sqns = (options.recovery_ivl_msec >= 0 ? + options.recovery_ivl_msec * options.rate / + (1000 * txw_max_tpdu) : + options.recovery_ivl * options.rate / + txw_max_tpdu), ambient_spm = pgm_secs (30), heartbeat_spm[] = { pgm_msecs (100), pgm_msecs (100), @@ -247,10 +253,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, &send_only, sizeof (send_only)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_MAX_RTE, - &txw_max_rte, sizeof (txw_max_rte)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SECS, - &txw_secs, sizeof (txw_secs)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS, + &txw_sqns, sizeof (txw_sqns)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM, &ambient_spm, sizeof (ambient_spm)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM, -- cgit v1.2.3