diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-02-21 11:22:54 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-02-21 11:22:54 +0100 |
commit | 5c0931121ba1854766599b6b7dbd1a7937febd6d (patch) | |
tree | b0673b15736c118810f018cc6f1c2130a2bcf1ae | |
parent | 12486fecc4f8d9a3bed37cf3d732b1250b075e24 (diff) |
Computation of buffer size for PGM fixed.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | src/pgm_socket.cpp | 41 | ||||
-rw-r--r-- | src/pgm_socket.hpp | 3 |
2 files changed, 32 insertions, 12 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 4c92f97..9f96f6f 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -151,8 +151,9 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) // All options are of data type int const int encapsulation_port = port_number; if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, - &encapsulation_port, sizeof (encapsulation_port)) || - !pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, + &encapsulation_port, sizeof (encapsulation_port))) + goto err_abort; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &encapsulation_port, sizeof (encapsulation_port))) goto err_abort; } @@ -200,11 +201,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (receiver) { const int recv_only = 1, rxw_max_tpdu = (int) pgm_max_tpdu, - rxw_sqns = (options.recovery_ivl_msec >= 0 ? - options.recovery_ivl_msec * options.rate / - (1000 * rxw_max_tpdu) : - options.recovery_ivl * options.rate / - rxw_max_tpdu), + rxw_sqns = compute_sqns (rxw_max_tpdu), peer_expiry = pgm_secs (300), spmr_expiry = pgm_msecs (25), nak_bo_ivl = pgm_msecs (50), @@ -235,11 +232,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) } else { const int send_only = 1, txw_max_tpdu = (int) pgm_max_tpdu, - txw_sqns = (options.recovery_ivl_msec >= 0 ? - options.recovery_ivl_msec * options.rate / - (1000 * txw_max_tpdu) : - options.recovery_ivl * options.rate / - txw_max_tpdu), + txw_sqns = compute_sqns (txw_max_tpdu), ambient_spm = pgm_secs (30), heartbeat_spm[] = { pgm_msecs (100), pgm_msecs (100), @@ -680,5 +673,29 @@ void zmq::pgm_socket_t::process_upstream () errno = EAGAIN; } +int zmq::pgm_socket_t::compute_sqns (int tpdu_) +{ + // Convert rate into B/ms. + uint64_t rate = ((uint64_t) options.rate) / 8; + + // Get recovery interval in milliseconds. + uint64_t interval = options.recovery_ivl_msec >= 0 ? + options.recovery_ivl_msec : + options.recovery_ivl * 1000; + + // Compute the size of the buffer in bytes. + uint64_t size = interval * rate; + + // Translate the size into number of packets. + uint64_t sqns = size / tpdu_; + zmq_assert (sqns >= 0); + + // Buffer should be able to contain at least one packet. + if (sqns == 0) + sqns = 1; + + return sqns; +} + #endif diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index e6216de..dad8d6d 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -80,6 +80,9 @@ namespace zmq void process_upstream (); private: + + // Compute size of the buffer based on rate and recovery interval. + int compute_sqns (int tpdu_); // OpenPGM transport. pgm_sock_t* sock; |