From 2e39f892c353851fe90261db0a0875abab50539f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 10 Dec 2009 09:47:24 +0100 Subject: ZMQII-27: Allow setting SNDBUF and RCVBUF size from 0MQ API (POSIX) --- src/options.cpp | 18 ++++++++++++++++++ src/options.hpp | 3 +++ src/tcp_socket.cpp | 17 +++++++++++++++-- src/tcp_socket.hpp | 3 ++- src/zmq_connecter_init.cpp | 2 +- src/zmq_engine.cpp | 8 +++++--- src/zmq_engine.hpp | 5 ++++- src/zmq_listener_init.cpp | 2 +- 8 files changed, 49 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/options.cpp b/src/options.cpp index 120ae7c..3f903cb 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -30,6 +30,8 @@ zmq::options_t::options_t () : rate (100), recovery_ivl (10), use_multicast_loop (true), + sndbuf (0), + rcvbuf (0), requires_in (false), requires_out (false) { @@ -106,6 +108,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return -1; } return 0; + + case ZMQ_SNDBUF: + if (optvallen_ != sizeof (uint64_t)) { + errno = EINVAL; + return -1; + } + sndbuf = *((uint64_t*) optval_); + return 0; + + case ZMQ_RCVBUF: + if (optvallen_ != sizeof (uint64_t)) { + errno = EINVAL; + return -1; + } + rcvbuf = *((uint64_t*) optval_); + return 0; } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index a52fdeb..16bb857 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -49,6 +49,9 @@ namespace zmq // Enable multicast loopback. Default disabled (false). bool use_multicast_loop; + uint64_t sndbuf; + uint64_t rcvbuf; + // These options are never set by the user directly. Instead they are // provided by the specific socket type. bool requires_in; diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index d3de66f..6df9ef6 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -34,7 +34,7 @@ zmq::tcp_socket_t::~tcp_socket_t () close (); } -int zmq::tcp_socket_t::open (fd_t fd_) +int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_) { zmq_assert (s == retired_fd); s = fd_; @@ -129,10 +129,23 @@ zmq::tcp_socket_t::~tcp_socket_t () close (); } -int zmq::tcp_socket_t::open (fd_t fd_) +int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_) { assert (s == retired_fd); s = fd_; + + if (sndbuf_) { + int sz = (int) sndbuf_; + int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sz, sizeof (int)); + errno_assert (rc == 0); + } + + if (rcvbuf_) { + int sz = (int) rcvbuf_; + int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &sz, sizeof (int)); + errno_assert (rc == 0); + } + return 0; } diff --git a/src/tcp_socket.hpp b/src/tcp_socket.hpp index 406e4c0..e71a600 100644 --- a/src/tcp_socket.hpp +++ b/src/tcp_socket.hpp @@ -21,6 +21,7 @@ #define __ZMQ_TCP_SOCKET_HPP_INCLUDED__ #include "fd.hpp" +#include "stdint.hpp" namespace zmq { @@ -35,7 +36,7 @@ namespace zmq ~tcp_socket_t (); // Associates a socket with a native socket descriptor. - int open (fd_t fd_); + int open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_); // Closes the underlying socket. int close (); diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp index 3f165cd..ea6a8c0 100644 --- a/src/zmq_connecter_init.cpp +++ b/src/zmq_connecter_init.cpp @@ -31,7 +31,7 @@ zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_, session_name (session_name_) { // Create associated engine object. - engine = new zmq_engine_t (parent_, fd_); + engine = new zmq_engine_t (parent_, fd_, options); zmq_assert (engine); } diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index e8e689d..e8c9889 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -24,7 +24,8 @@ #include "config.hpp" #include "err.hpp" -zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : +zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, + const options_t &options_) : io_object_t (parent_), inbuf (NULL), insize (0), @@ -32,7 +33,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : outbuf (NULL), outsize (0), outpos (0), - inout (NULL) + inout (NULL), + options (options_) { // Allocate read & write buffer. inbuf_storage = (unsigned char*) malloc (in_batch_size); @@ -41,7 +43,7 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : zmq_assert (outbuf_storage); // Initialise the underlying socket. - int rc = tcp_socket.open (fd_); + int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); zmq_assert (rc == 0); } diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index ea77b7e..c842da7 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -36,7 +36,8 @@ namespace zmq { public: - zmq_engine_t (class io_thread_t *parent_, fd_t fd_); + zmq_engine_t (class io_thread_t *parent_, fd_t fd_, + const options_t &options_); ~zmq_engine_t (); // i_engine interface implementation. @@ -71,6 +72,8 @@ namespace zmq zmq_encoder_t encoder; zmq_decoder_t decoder; + options_t options; + zmq_engine_t (const zmq_engine_t&); void operator = (const zmq_engine_t&); }; diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 632bebe..0c9f0ee 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -29,7 +29,7 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_, has_peer_identity (false) { // Create associated engine object. - engine = new zmq_engine_t (parent_, fd_); + engine = new zmq_engine_t (parent_, fd_, options); zmq_assert (engine); } -- cgit v1.2.3