diff options
-rw-r--r-- | doc/zmq_getsockopt.txt | 20 | ||||
-rw-r--r-- | doc/zmq_setsockopt.txt | 24 | ||||
-rw-r--r-- | include/zmq.h | 1 | ||||
-rw-r--r-- | src/options.cpp | 18 | ||||
-rw-r--r-- | src/options.hpp | 2 | ||||
-rw-r--r-- | src/pgm_socket.cpp | 32 |
6 files changed, 83 insertions, 14 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 7f73e1c..132e7b3 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -167,6 +167,26 @@ Default value:: 10 Applicable socket types:: all, when using multicast transports +ZMQ_RECOVERY_IVL_MSEC: Get multicast recovery interval in milliseconds +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_RECOVERY_IVL'_MSEC option shall retrieve the recovery interval, in +milliseconds, for multicast transports using the specified 'socket'. The +recovery interval determines the maximum time in seconds that a receiver +can be absent from a multicast group before unrecoverable data loss will +occur. + +For backward compatibility, the default value of 'ZMQ_RECOVERY_IVL_MSEC' is +-1 indicating that the recovery interval should be obtained from the +'ZMQ_RECOVERY_IVL' option. However, if the 'ZMQ_RECOVERY_IVL_MSEC' value is +not zero, then it will take precedence, and be used. + +[horizontal] +Option value type:: int64_t +Option value unit:: milliseconds +Default value:: -1 +Applicable socket types:: all, when using multicast transports + + ZMQ_MCAST_LOOP: Control multicast loop-back ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_MCAST_LOOP' option controls whether data sent via multicast diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 86b01e4..58f04b3 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -171,6 +171,30 @@ Default value:: 10 Applicable socket types:: all, when using multicast transports +ZMQ_RECOVERY_IVL_MSEC: Set multicast recovery interval in milliseconds +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_RECOVERY_IVL_MSEC' option shall set the recovery interval, specified +in milliseconds (ms) for multicast transports using the specified 'socket'. +The recovery interval determines the maximum time in milliseconds that a +receiver can be absent from a multicast group before unrecoverable data loss +will occur. + +A non-zero value of the 'ZMQ_RECOVERY_IVL_MSEC' option will take precedence +over the 'ZMQ_RECOVERY_IVL' option, but since the default for the +'ZMQ_RECOVERY_IVL_MSEC' is -1, the default is to use the 'ZMQ_RECOVERY_IVL' +option value. + +CAUTION: Exercise care when setting large recovery intervals as the data +needed for recovery will be held in memory. For example, a 1 minute recovery +interval at a data rate of 1Gbps requires a 7GB in-memory buffer. + +[horizontal] +Option value type:: int64_t +Option value unit:: milliseconds +Default value:: -1 +Applicable socket types:: all, when using multicast transports + + ZMQ_MCAST_LOOP: Control multicast loop-back ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_MCAST_LOOP' option shall control whether data sent via multicast diff --git a/include/zmq.h b/include/zmq.h index 997595b..a773f45 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -202,6 +202,7 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_LINGER 17 #define ZMQ_RECONNECT_IVL 18 #define ZMQ_BACKLOG 19 +#define ZMQ_RECOVERY_IVL_MSEC 20 /* opt. recovery time, reconcile in 3.x */ /* Send/recv options. */ #define ZMQ_NOBLOCK 1 diff --git a/src/options.cpp b/src/options.cpp index b4ca6b5..ae35059 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -30,6 +30,7 @@ zmq::options_t::options_t () : affinity (0), rate (100), recovery_ivl (10), + recovery_ivl_msec (-1), use_multicast_loop (true), sndbuf (0), rcvbuf (0), @@ -101,6 +102,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, recovery_ivl = (uint32_t) *((int64_t*) optval_); return 0; + case ZMQ_RECOVERY_IVL_MSEC: + if (optvallen_ != sizeof (int64_t) || *((int64_t*) optval_) < 0) { + errno = EINVAL; + return -1; + } + recovery_ivl_msec = (int32_t) *((int64_t*) optval_); + return 0; + case ZMQ_MCAST_LOOP: if (optvallen_ != sizeof (int64_t)) { errno = EINVAL; @@ -225,6 +234,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int64_t); return 0; + case ZMQ_RECOVERY_IVL_MSEC: + if (*optvallen_ < sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + *((int64_t*) optval_) = recovery_ivl_msec; + *optvallen_ = sizeof (int64_t); + return 0; + case ZMQ_MCAST_LOOP: if (*optvallen_ < sizeof (int64_t)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 2c6f65d..e6df505 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -44,6 +44,8 @@ namespace zmq // Reliability time interval [s]. Default 10s. uint32_t recovery_ivl; + // Reliability time interval [ms]. Default -1 = not used. + int32_t recovery_ivl_msec; // Enable multicast loopback. Default disabled (false). bool use_multicast_loop; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1c98edd..4c92f97 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -89,8 +89,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) errno = EINVAL; return -1; } - // Recovery interval [s]. - if (options.recovery_ivl <= 0) { + // Recovery interval [s] or [ms] - based on the user's call + if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) { errno = EINVAL; return -1; } @@ -199,8 +199,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (receiver) { const int recv_only = 1, - rxw_max_rte = options.rate * 1000 / 8, - rxw_secs = options.recovery_ivl, + rxw_max_tpdu = (int) pgm_max_tpdu, + rxw_sqns = (options.recovery_ivl_msec >= 0 ? + options.recovery_ivl_msec * options.rate / + (1000 * rxw_max_tpdu) : + options.recovery_ivl * options.rate / + rxw_max_tpdu), peer_expiry = pgm_secs (300), spmr_expiry = pgm_msecs (25), nak_bo_ivl = pgm_msecs (50), @@ -211,10 +215,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, sizeof (recv_only)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_MAX_RTE, &rxw_max_rte, - sizeof (rxw_max_rte)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SECS, &rxw_secs, - sizeof (rxw_secs)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns, + sizeof (rxw_sqns)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof (peer_expiry)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, @@ -232,8 +234,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) goto err_abort; } else { const int send_only = 1, - txw_max_rte = options.rate * 1000 / 8, - txw_secs = options.recovery_ivl, + txw_max_tpdu = (int) pgm_max_tpdu, + txw_sqns = (options.recovery_ivl_msec >= 0 ? + options.recovery_ivl_msec * options.rate / + (1000 * txw_max_tpdu) : + options.recovery_ivl * options.rate / + txw_max_tpdu), ambient_spm = pgm_secs (30), heartbeat_spm[] = { pgm_msecs (100), pgm_msecs (100), @@ -247,10 +253,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, &send_only, sizeof (send_only)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_MAX_RTE, - &txw_max_rte, sizeof (txw_max_rte)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SECS, - &txw_secs, sizeof (txw_secs)) || + !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS, + &txw_sqns, sizeof (txw_sqns)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM, &ambient_spm, sizeof (ambient_spm)) || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM, |