diff options
| author | Bob Beaty <rbeaty@peak6.com> | 2010-12-09 21:42:58 +0100 | 
|---|---|---|
| committer | Martin Sustrik <sustrik@250bpm.com> | 2010-12-09 21:42:58 +0100 | 
| commit | fcfad5682ed7a7f5108853d2a7039aedfd9a9ac2 (patch) | |
| tree | 08fa82d832d06899058a386a8a2dab263f64a5ba /src/pgm_socket.cpp | |
| parent | 1d81d2f1d4549c2cd0999c9544b059c29706f260 (diff) | |
Added Recovery Interval in Milliseconds
For very high-speed message systems, the memory used for recovery can get to
be very large. The corrent limitation on that reduction is the ZMQ_RECOVERY_IVL
of 1 sec. I added in an additional option ZMQ_RECOVERY_IVL_MSEC, which is the
Recovery Interval in milliseconds. If used, this will override the previous
one, and allow you to set a sub-second recovery interval. If not set, the
default behavior is to use ZMQ_RECOVERY_IVL.
Signed-off-by: Bob Beaty <rbeaty@peak6.com>
Diffstat (limited to 'src/pgm_socket.cpp')
| -rw-r--r-- | src/pgm_socket.cpp | 32 | 
1 files changed, 18 insertions, 14 deletions
| diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1c98edd..4c92f97 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -89,8 +89,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)          errno = EINVAL;          return -1;      } -    //  Recovery interval [s].  -    if (options.recovery_ivl <= 0) { +    //  Recovery interval [s] or [ms] - based on the user's call  +    if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) {          errno = EINVAL;          return -1;      } @@ -199,8 +199,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)      if (receiver) {          const int recv_only        = 1, -                  rxw_max_rte      = options.rate * 1000 / 8, -                  rxw_secs         = options.recovery_ivl, +                  rxw_max_tpdu     = (int) pgm_max_tpdu, +                  rxw_sqns         = (options.recovery_ivl_msec >= 0 ? +                                      options.recovery_ivl_msec * options.rate / +                                      (1000 * rxw_max_tpdu) : +                                      options.recovery_ivl * options.rate / +                                      rxw_max_tpdu),                    peer_expiry      = pgm_secs (300),                    spmr_expiry      = pgm_msecs (25),                    nak_bo_ivl       = pgm_msecs (50), @@ -211,10 +215,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)          if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,                  sizeof (recv_only)) || -            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_MAX_RTE, &rxw_max_rte, -                sizeof (rxw_max_rte)) || -            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SECS, &rxw_secs, -                sizeof (rxw_secs)) || +            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns, +                sizeof (rxw_sqns)) ||              !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,                  sizeof (peer_expiry)) ||              !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, @@ -232,8 +234,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)              goto err_abort;      } else {          const int send_only        = 1, -                  txw_max_rte      = options.rate * 1000 / 8, -                  txw_secs         = options.recovery_ivl, +                  txw_max_tpdu     = (int) pgm_max_tpdu, +                  txw_sqns         = (options.recovery_ivl_msec >= 0 ? +                                      options.recovery_ivl_msec * options.rate / +                                      (1000 * txw_max_tpdu) : +                                      options.recovery_ivl * options.rate / +                                      txw_max_tpdu),                    ambient_spm      = pgm_secs (30),                    heartbeat_spm[]  = { pgm_msecs (100),                                         pgm_msecs (100), @@ -247,10 +253,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)          if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,                  &send_only, sizeof (send_only)) || -            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_MAX_RTE, -                &txw_max_rte, sizeof (txw_max_rte)) || -            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SECS, -                &txw_secs, sizeof (txw_secs)) || +            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS, +                &txw_sqns, sizeof (txw_sqns)) ||              !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,                  &ambient_spm, sizeof (ambient_spm)) ||              !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM, | 
