summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-10 09:47:24 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-10 09:47:24 +0100
commit2e39f892c353851fe90261db0a0875abab50539f (patch)
treeef7a2eb32a84418f21180ec79a39f4090f784d5b
parent72dacc35702a14ab0bb5a2650dffbb3bbda63175 (diff)
ZMQII-27: Allow setting SNDBUF and RCVBUF size from 0MQ API (POSIX)
-rw-r--r--bindings/c/zmq.h2
-rw-r--r--bindings/cl/zeromq.lisp2
-rw-r--r--bindings/java/org/zmq/Socket.java2
-rw-r--r--bindings/python/pyzmq.cpp6
-rw-r--r--bindings/ruby/rbzmq.cpp2
-rw-r--r--man/man3/zmq_setsockopt.324
-rw-r--r--src/options.cpp18
-rw-r--r--src/options.hpp3
-rw-r--r--src/tcp_socket.cpp17
-rw-r--r--src/tcp_socket.hpp3
-rw-r--r--src/zmq_connecter_init.cpp2
-rw-r--r--src/zmq_engine.cpp8
-rw-r--r--src/zmq_engine.hpp5
-rw-r--r--src/zmq_listener_init.cpp2
14 files changed, 87 insertions, 9 deletions
diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h
index 849ddef..ae8d6b1 100644
--- a/bindings/c/zmq.h
+++ b/bindings/c/zmq.h
@@ -159,6 +159,8 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_RATE 8
#define ZMQ_RECOVERY_IVL 9
#define ZMQ_MCAST_LOOP 10
+#define ZMQ_SNDBUF 11
+#define ZMQ_RCVBUF 12
#define ZMQ_NOBLOCK 1
#define ZMQ_NOFLUSH 2
diff --git a/bindings/cl/zeromq.lisp b/bindings/cl/zeromq.lisp
index 94f7672..03befd5 100644
--- a/bindings/cl/zeromq.lisp
+++ b/bindings/cl/zeromq.lisp
@@ -158,6 +158,8 @@
(defconstant rate 8)
(defconstant recovery-ivl 9)
(defconstant mcast-loop 10)
+(defconstant sndbuf 11)
+(defconstant rcvbuf 12)
(defcfun* ("zmq_setsockopt" %setsockopt) :int
(s :pointer)
diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java
index 396a6a0..935fade 100644
--- a/bindings/java/org/zmq/Socket.java
+++ b/bindings/java/org/zmq/Socket.java
@@ -47,6 +47,8 @@ public class Socket
public static final int RATE = 8;
public static final int RECOVERY_IVL = 9;
public static final int MCAST_LOOP = 10;
+ public static final int SNDBUF = 11;
+ public static final int RCVBUF = 12;
/**
* Class constructor.
diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp
index 26ca7ac..f171eab 100644
--- a/bindings/python/pyzmq.cpp
+++ b/bindings/python/pyzmq.cpp
@@ -534,6 +534,12 @@ PyMODINIT_FUNC initlibpyzmq ()
t = PyInt_FromLong (ZMQ_MCAST_LOOP);
PyDict_SetItemString (dict, "MCAST_LOOP", t);
Py_DECREF (t);
+ t = PyInt_FromLong (ZMQ_SNDBUF);
+ PyDict_SetItemString (dict, "SNDBUF", t);
+ Py_DECREF (t);
+ t = PyInt_FromLong (ZMQ_RCVBUF);
+ PyDict_SetItemString (dict, "RCVBUF", t);
+ Py_DECREF (t);
t = PyInt_FromLong (ZMQ_POLL);
PyDict_SetItemString (dict, "POLL", t);
Py_DECREF (t);
diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp
index 2a26ce1..43baeef 100644
--- a/bindings/ruby/rbzmq.cpp
+++ b/bindings/ruby/rbzmq.cpp
@@ -266,6 +266,8 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("RATE", INT2NUM (ZMQ_RATE));
rb_define_global_const ("RECOVERY_IVL", INT2NUM (ZMQ_RECOVERY_IVL));
rb_define_global_const ("MCAST_LOOP", INT2NUM (ZMQ_MCAST_LOOP));
+ rb_define_global_const ("SNDBUF", INT2NUM (ZMQ_SNDBUF));
+ rb_define_global_const ("RCVBUF", INT2NUM (ZMQ_RCVBUF));
rb_define_global_const ("NOBLOCK", INT2NUM (ZMQ_NOBLOCK));
rb_define_global_const ("NOFLUSH", INT2NUM (ZMQ_NOFLUSH));
diff --git a/man/man3/zmq_setsockopt.3 b/man/man3/zmq_setsockopt.3
index a79f879..36b7f08 100644
--- a/man/man3/zmq_setsockopt.3
+++ b/man/man3/zmq_setsockopt.3
@@ -17,6 +17,7 @@ High watermark for the message pipes associated with the socket. The water
mark cannot be exceeded. If the messages don't fit into the pipe emergency
mechanisms of the particular socket type are used (block, drop etc.) If HWM
is set to zero, there are no limits for the content of the pipe.
+
Type: int64_t Unit: bytes Default: 0
.IP "\fBZMQ_LWM\fP"
@@ -24,6 +25,7 @@ Low watermark makes sense only if high watermark is defined (i.e. is non-zero).
When the emergency state is reached when messages overflow the pipe, the
emergency lasts till the size of the pipe decreases to low watermark.
At that point normal state is resumed.
+
Type: int64_t Unit: bytes Default: 0
.IP "\fBZMQ_SWAP\fP"
@@ -31,6 +33,7 @@ Swap allows the pipe to exceed high watermark. However, the data are written
to the disk rather than held in the memory. Until high watermark is
exceeded there is no disk activity involved though. The value of the option
defines maximal size of the swap file.
+
Type: int64_t Unit: bytes Default: 0
.IP "\fBZMQ_AFFINITY\fP"
@@ -41,6 +44,7 @@ fairly among the threads in the thread pool. For non-zero values, the lowest
bit corresponds to the thread 1, second lowest bit to the thread 2 etc.
Thus, value of 3 means that from now on newly created sockets will handle
I/O activity exclusively using threads no. 1 and 2.
+
Type: int64_t Unit: N/A (bitmap) Default: 0
.IP "\fBZMQ_IDENTITY\fP"
@@ -50,6 +54,7 @@ separated from other runs. However, with identity application reconnects to
existing infrastructure left by the previous run. Thus it may receive
messages that were sent in the meantime, it shares pipe limits with the
previous run etc.
+
Type: string Unit: N/A Default: NULL
.IP "\fBZMQ_SUBSCRIBE\fP"
@@ -61,6 +66,7 @@ specific topic ("x.y.z") and/or messages with specific topic prefix
the very beginning of the message. Multiple filters can be attached to
a single 'sub' socket. In that case message passes if it matches at least
one of the filters.
+
Type: string Unit: N/A Default: N/A
.IP "\fBZMQ_UNSUBSCRIBE\fP"
@@ -69,12 +75,14 @@ The filter specified must match the string passed to ZMQ_SUBSCRIBE options
exactly. If there were several instances of the same filter created,
this options removes only one of them, leaving the rest in place
and functional.
+
Type: string Unit: N/A Default: N/A
.IP "\fBZMQ_RATE\fP"
This option applies only to sending side of multicast transports (pgm & udp).
It specifies maximal outgoing data rate that an individual sender socket
can send.
+
Type: uint64_t Unit: kilobits/second Default: 100
.IP "\fBZMQ_RECOVERY_IVL\fP"
@@ -84,6 +92,7 @@ Keep in mind that large recovery intervals at high data rates result in
very large recovery buffers, meaning that you can easily overload your box
by setting say 1 minute recovery interval at 1Gb/s rate (requires
7GB in-memory buffer).
+
Type: uint64_t Unit: seconds Default: 10
.IP "\fBZMQ_MCAST_LOOP\fP"
@@ -92,8 +101,23 @@ means that the mutlicast packets can be received on the box they were sent
from. Setting the value to 0 disables the loopback functionality which
can have negative impact on the performance. If possible, disable
the loopback in production environments.
+
Type: uint64_t Unit: N/A (boolean value) Default: 1
+.IP "\fBZMQ_SNDBUF\fP"
+Sets the underlying kernel transmit buffer size to the specified size. See
+.IR SO_SNDBUF
+POSIX socket option. Value of zero means leaving the OS default unchanged.
+
+Type: uint64_t Unit: bytes Default: 0
+
+.IP "\fBZMQ_RCVBUF\fP"
+Sets the underlying kernel receive buffer size to the specified size. See
+.IR SO_RCVBUF
+POSIX socket option. Value of zero means leaving the OS default unchanged.
+
+Type: uint64_t Unit: bytes Default: 0
+
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
diff --git a/src/options.cpp b/src/options.cpp
index 120ae7c..3f903cb 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -30,6 +30,8 @@ zmq::options_t::options_t () :
rate (100),
recovery_ivl (10),
use_multicast_loop (true),
+ sndbuf (0),
+ rcvbuf (0),
requires_in (false),
requires_out (false)
{
@@ -106,6 +108,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return -1;
}
return 0;
+
+ case ZMQ_SNDBUF:
+ if (optvallen_ != sizeof (uint64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ sndbuf = *((uint64_t*) optval_);
+ return 0;
+
+ case ZMQ_RCVBUF:
+ if (optvallen_ != sizeof (uint64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ rcvbuf = *((uint64_t*) optval_);
+ return 0;
}
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index a52fdeb..16bb857 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -49,6 +49,9 @@ namespace zmq
// Enable multicast loopback. Default disabled (false).
bool use_multicast_loop;
+ uint64_t sndbuf;
+ uint64_t rcvbuf;
+
// These options are never set by the user directly. Instead they are
// provided by the specific socket type.
bool requires_in;
diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp
index d3de66f..6df9ef6 100644
--- a/src/tcp_socket.cpp
+++ b/src/tcp_socket.cpp
@@ -34,7 +34,7 @@ zmq::tcp_socket_t::~tcp_socket_t ()
close ();
}
-int zmq::tcp_socket_t::open (fd_t fd_)
+int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_)
{
zmq_assert (s == retired_fd);
s = fd_;
@@ -129,10 +129,23 @@ zmq::tcp_socket_t::~tcp_socket_t ()
close ();
}
-int zmq::tcp_socket_t::open (fd_t fd_)
+int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_)
{
assert (s == retired_fd);
s = fd_;
+
+ if (sndbuf_) {
+ int sz = (int) sndbuf_;
+ int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sz, sizeof (int));
+ errno_assert (rc == 0);
+ }
+
+ if (rcvbuf_) {
+ int sz = (int) rcvbuf_;
+ int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &sz, sizeof (int));
+ errno_assert (rc == 0);
+ }
+
return 0;
}
diff --git a/src/tcp_socket.hpp b/src/tcp_socket.hpp
index 406e4c0..e71a600 100644
--- a/src/tcp_socket.hpp
+++ b/src/tcp_socket.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_TCP_SOCKET_HPP_INCLUDED__
#include "fd.hpp"
+#include "stdint.hpp"
namespace zmq
{
@@ -35,7 +36,7 @@ namespace zmq
~tcp_socket_t ();
// Associates a socket with a native socket descriptor.
- int open (fd_t fd_);
+ int open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_);
// Closes the underlying socket.
int close ();
diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp
index 3f165cd..ea6a8c0 100644
--- a/src/zmq_connecter_init.cpp
+++ b/src/zmq_connecter_init.cpp
@@ -31,7 +31,7 @@ zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
session_name (session_name_)
{
// Create associated engine object.
- engine = new zmq_engine_t (parent_, fd_);
+ engine = new zmq_engine_t (parent_, fd_, options);
zmq_assert (engine);
}
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index e8e689d..e8c9889 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -24,7 +24,8 @@
#include "config.hpp"
#include "err.hpp"
-zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
+zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
+ const options_t &options_) :
io_object_t (parent_),
inbuf (NULL),
insize (0),
@@ -32,7 +33,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
outbuf (NULL),
outsize (0),
outpos (0),
- inout (NULL)
+ inout (NULL),
+ options (options_)
{
// Allocate read & write buffer.
inbuf_storage = (unsigned char*) malloc (in_batch_size);
@@ -41,7 +43,7 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
zmq_assert (outbuf_storage);
// Initialise the underlying socket.
- int rc = tcp_socket.open (fd_);
+ int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
zmq_assert (rc == 0);
}
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index ea77b7e..c842da7 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -36,7 +36,8 @@ namespace zmq
{
public:
- zmq_engine_t (class io_thread_t *parent_, fd_t fd_);
+ zmq_engine_t (class io_thread_t *parent_, fd_t fd_,
+ const options_t &options_);
~zmq_engine_t ();
// i_engine interface implementation.
@@ -71,6 +72,8 @@ namespace zmq
zmq_encoder_t encoder;
zmq_decoder_t decoder;
+ options_t options;
+
zmq_engine_t (const zmq_engine_t&);
void operator = (const zmq_engine_t&);
};
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index 632bebe..0c9f0ee 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -29,7 +29,7 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
has_peer_identity (false)
{
// Create associated engine object.
- engine = new zmq_engine_t (parent_, fd_);
+ engine = new zmq_engine_t (parent_, fd_, options);
zmq_assert (engine);
}