summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-02-21 11:22:54 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-02-21 11:22:54 +0100
commit5c0931121ba1854766599b6b7dbd1a7937febd6d (patch)
treeb0673b15736c118810f018cc6f1c2130a2bcf1ae
parent12486fecc4f8d9a3bed37cf3d732b1250b075e24 (diff)
Computation of buffer size for PGM fixed.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/pgm_socket.cpp41
-rw-r--r--src/pgm_socket.hpp3
2 files changed, 32 insertions, 12 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 4c92f97..9f96f6f 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -151,8 +151,9 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
// All options are of data type int
const int encapsulation_port = port_number;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
- &encapsulation_port, sizeof (encapsulation_port)) ||
- !pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
+ &encapsulation_port, sizeof (encapsulation_port)))
+ goto err_abort;
+ if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
&encapsulation_port, sizeof (encapsulation_port)))
goto err_abort;
}
@@ -200,11 +201,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (receiver) {
const int recv_only = 1,
rxw_max_tpdu = (int) pgm_max_tpdu,
- rxw_sqns = (options.recovery_ivl_msec >= 0 ?
- options.recovery_ivl_msec * options.rate /
- (1000 * rxw_max_tpdu) :
- options.recovery_ivl * options.rate /
- rxw_max_tpdu),
+ rxw_sqns = compute_sqns (rxw_max_tpdu),
peer_expiry = pgm_secs (300),
spmr_expiry = pgm_msecs (25),
nak_bo_ivl = pgm_msecs (50),
@@ -235,11 +232,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
} else {
const int send_only = 1,
txw_max_tpdu = (int) pgm_max_tpdu,
- txw_sqns = (options.recovery_ivl_msec >= 0 ?
- options.recovery_ivl_msec * options.rate /
- (1000 * txw_max_tpdu) :
- options.recovery_ivl * options.rate /
- txw_max_tpdu),
+ txw_sqns = compute_sqns (txw_max_tpdu),
ambient_spm = pgm_secs (30),
heartbeat_spm[] = { pgm_msecs (100),
pgm_msecs (100),
@@ -680,5 +673,29 @@ void zmq::pgm_socket_t::process_upstream ()
errno = EAGAIN;
}
+int zmq::pgm_socket_t::compute_sqns (int tpdu_)
+{
+ // Convert rate into B/ms.
+ uint64_t rate = ((uint64_t) options.rate) / 8;
+
+ // Get recovery interval in milliseconds.
+ uint64_t interval = options.recovery_ivl_msec >= 0 ?
+ options.recovery_ivl_msec :
+ options.recovery_ivl * 1000;
+
+ // Compute the size of the buffer in bytes.
+ uint64_t size = interval * rate;
+
+ // Translate the size into number of packets.
+ uint64_t sqns = size / tpdu_;
+ zmq_assert (sqns >= 0);
+
+ // Buffer should be able to contain at least one packet.
+ if (sqns == 0)
+ sqns = 1;
+
+ return sqns;
+}
+
#endif
diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp
index e6216de..dad8d6d 100644
--- a/src/pgm_socket.hpp
+++ b/src/pgm_socket.hpp
@@ -80,6 +80,9 @@ namespace zmq
void process_upstream ();
private:
+
+ // Compute size of the buffer based on rate and recovery interval.
+ int compute_sqns (int tpdu_);
// OpenPGM transport.
pgm_sock_t* sock;