summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-10 09:47:24 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-10 09:47:24 +0100
commit2e39f892c353851fe90261db0a0875abab50539f (patch)
treeef7a2eb32a84418f21180ec79a39f4090f784d5b /src
parent72dacc35702a14ab0bb5a2650dffbb3bbda63175 (diff)
ZMQII-27: Allow setting SNDBUF and RCVBUF size from 0MQ API (POSIX)
Diffstat (limited to 'src')
-rw-r--r--src/options.cpp18
-rw-r--r--src/options.hpp3
-rw-r--r--src/tcp_socket.cpp17
-rw-r--r--src/tcp_socket.hpp3
-rw-r--r--src/zmq_connecter_init.cpp2
-rw-r--r--src/zmq_engine.cpp8
-rw-r--r--src/zmq_engine.hpp5
-rw-r--r--src/zmq_listener_init.cpp2
8 files changed, 49 insertions, 9 deletions
diff --git a/src/options.cpp b/src/options.cpp
index 120ae7c..3f903cb 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -30,6 +30,8 @@ zmq::options_t::options_t () :
rate (100),
recovery_ivl (10),
use_multicast_loop (true),
+ sndbuf (0),
+ rcvbuf (0),
requires_in (false),
requires_out (false)
{
@@ -106,6 +108,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return -1;
}
return 0;
+
+ case ZMQ_SNDBUF:
+ if (optvallen_ != sizeof (uint64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ sndbuf = *((uint64_t*) optval_);
+ return 0;
+
+ case ZMQ_RCVBUF:
+ if (optvallen_ != sizeof (uint64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ rcvbuf = *((uint64_t*) optval_);
+ return 0;
}
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index a52fdeb..16bb857 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -49,6 +49,9 @@ namespace zmq
// Enable multicast loopback. Default disabled (false).
bool use_multicast_loop;
+ uint64_t sndbuf;
+ uint64_t rcvbuf;
+
// These options are never set by the user directly. Instead they are
// provided by the specific socket type.
bool requires_in;
diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp
index d3de66f..6df9ef6 100644
--- a/src/tcp_socket.cpp
+++ b/src/tcp_socket.cpp
@@ -34,7 +34,7 @@ zmq::tcp_socket_t::~tcp_socket_t ()
close ();
}
-int zmq::tcp_socket_t::open (fd_t fd_)
+int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_)
{
zmq_assert (s == retired_fd);
s = fd_;
@@ -129,10 +129,23 @@ zmq::tcp_socket_t::~tcp_socket_t ()
close ();
}
-int zmq::tcp_socket_t::open (fd_t fd_)
+int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_)
{
assert (s == retired_fd);
s = fd_;
+
+ if (sndbuf_) {
+ int sz = (int) sndbuf_;
+ int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sz, sizeof (int));
+ errno_assert (rc == 0);
+ }
+
+ if (rcvbuf_) {
+ int sz = (int) rcvbuf_;
+ int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &sz, sizeof (int));
+ errno_assert (rc == 0);
+ }
+
return 0;
}
diff --git a/src/tcp_socket.hpp b/src/tcp_socket.hpp
index 406e4c0..e71a600 100644
--- a/src/tcp_socket.hpp
+++ b/src/tcp_socket.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_TCP_SOCKET_HPP_INCLUDED__
#include "fd.hpp"
+#include "stdint.hpp"
namespace zmq
{
@@ -35,7 +36,7 @@ namespace zmq
~tcp_socket_t ();
// Associates a socket with a native socket descriptor.
- int open (fd_t fd_);
+ int open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_);
// Closes the underlying socket.
int close ();
diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp
index 3f165cd..ea6a8c0 100644
--- a/src/zmq_connecter_init.cpp
+++ b/src/zmq_connecter_init.cpp
@@ -31,7 +31,7 @@ zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
session_name (session_name_)
{
// Create associated engine object.
- engine = new zmq_engine_t (parent_, fd_);
+ engine = new zmq_engine_t (parent_, fd_, options);
zmq_assert (engine);
}
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index e8e689d..e8c9889 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -24,7 +24,8 @@
#include "config.hpp"
#include "err.hpp"
-zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
+zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
+ const options_t &options_) :
io_object_t (parent_),
inbuf (NULL),
insize (0),
@@ -32,7 +33,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
outbuf (NULL),
outsize (0),
outpos (0),
- inout (NULL)
+ inout (NULL),
+ options (options_)
{
// Allocate read & write buffer.
inbuf_storage = (unsigned char*) malloc (in_batch_size);
@@ -41,7 +43,7 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
zmq_assert (outbuf_storage);
// Initialise the underlying socket.
- int rc = tcp_socket.open (fd_);
+ int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
zmq_assert (rc == 0);
}
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index ea77b7e..c842da7 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -36,7 +36,8 @@ namespace zmq
{
public:
- zmq_engine_t (class io_thread_t *parent_, fd_t fd_);
+ zmq_engine_t (class io_thread_t *parent_, fd_t fd_,
+ const options_t &options_);
~zmq_engine_t ();
// i_engine interface implementation.
@@ -71,6 +72,8 @@ namespace zmq
zmq_encoder_t encoder;
zmq_decoder_t decoder;
+ options_t options;
+
zmq_engine_t (const zmq_engine_t&);
void operator = (const zmq_engine_t&);
};
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index 632bebe..0c9f0ee 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -29,7 +29,7 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
has_peer_identity (false)
{
// Create associated engine object.
- engine = new zmq_engine_t (parent_, fd_);
+ engine = new zmq_engine_t (parent_, fd_, options);
zmq_assert (engine);
}