diff options
author | Bob Beaty <rbeaty@peak6.com> | 2010-12-09 21:42:58 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-12-09 21:42:58 +0100 |
commit | fcfad5682ed7a7f5108853d2a7039aedfd9a9ac2 (patch) | |
tree | 08fa82d832d06899058a386a8a2dab263f64a5ba /src/pgm_socket.cpp | |
parent | 1d81d2f1d4549c2cd0999c9544b059c29706f260 (diff) |
Added Recovery Interval in Milliseconds
For very high-speed message systems, the memory used for recovery can get to
be very large. The corrent limitation on that reduction is the ZMQ_RECOVERY_IVL
of 1 sec. I added in an additional option ZMQ_RECOVERY_IVL_MSEC, which is the
Recovery Interval in milliseconds. If used, this will override the previous
one, and allow you to set a sub-second recovery interval. If not set, the
default behavior is to use ZMQ_RECOVERY_IVL.
Signed-off-by: Bob Beaty <rbeaty@peak6.com>
Diffstat (limited to 'src/pgm_socket.cpp')
-rw-r--r-- | src/pgm_socket.cpp | 32 |
1 files changed, 18 insertions, 14 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1c98edd..4c92f97 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -89,8 +89,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) errno = EINVAL; return -1; } - // Recovery interval [s]. - if (options.recovery_ivl <= 0) { + // Recovery interval [s] or [ms] - based on the user's call + if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) { errno = EINVAL; return -1; } @@ -199,8 +199,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (receiver) { const int recv_only = 1, - rxw_max_rte = options.rate * 1000 / 8, - rxw_secs = options.recovery_ivl, + rxw_max_tpdu = (int) pgm_max_tpdu, + rxw_sqns = (options.recovery_ivl_msec >= 0 ? + options.recovery_ivl_msec * options.rate / + (1000 * rxw_max_tpdu) : + options.recovery_ivl * options.rate / + rxw_max_tpdu), peer_expiry = pgm_secs (300), spmr_expiry = pgm_msecs (25), nak_bo_ivl = pgm_msecs (50), @@ -211,10 +215,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, sizeof (recv_only)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_MAX_RTE, &rxw_max_rte, - sizeof (rxw_max_rte)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SECS, &rxw_secs, - sizeof (rxw_secs)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns, + sizeof (rxw_sqns)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof (peer_expiry)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, @@ -232,8 +234,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) goto err_abort; } else { const int send_only = 1, - txw_max_rte = options.rate * 1000 / 8, - txw_secs = options.recovery_ivl, + txw_max_tpdu = (int) pgm_max_tpdu, + txw_sqns = (options.recovery_ivl_msec >= 0 ? + options.recovery_ivl_msec * options.rate / + (1000 * txw_max_tpdu) : + options.recovery_ivl * options.rate / + txw_max_tpdu), ambient_spm = pgm_secs (30), heartbeat_spm[] = { pgm_msecs (100), pgm_msecs (100), @@ -247,10 +253,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, &send_only, sizeof (send_only)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_MAX_RTE, - &txw_max_rte, sizeof (txw_max_rte)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SECS, - &txw_secs, sizeof (txw_secs)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS, + &txw_sqns, sizeof (txw_sqns)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM, &ambient_spm, sizeof (ambient_spm)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM, |