summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-05-25 15:03:57 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-05-25 15:03:57 +0200
commit8408ae066dce123fc93e4f53dbadb1f60b7f2e8a (patch)
tree414194ee2bb2cf5eb0937ffb872e27c5e8656e03 /src
parentf34a468a263c7b4013a267297ee7f121e12dfb9d (diff)
LWM is computed rather than explicitly specified by user
Diffstat (limited to 'src')
-rw-r--r--src/config.hpp5
-rw-r--r--src/options.cpp18
-rw-r--r--src/options.hpp1
-rw-r--r--src/pipe.cpp35
-rw-r--r--src/pipe.hpp4
-rw-r--r--src/session.cpp6
-rw-r--r--src/socket_base.cpp12
7 files changed, 45 insertions, 36 deletions
diff --git a/src/config.hpp b/src/config.hpp
index e211f34..2c0ac2d 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -58,7 +58,10 @@ namespace zmq
// So, if there are 10 messages that fit into the batch size, all of
// them may be written by a single 'send' system call, thus avoiding
// unnecessary network stack traversals.
- out_batch_size = 8192,
+ out_batch_size = 8192,
+
+ // Maximal delta between high and low watermark.
+ max_wm_delta = 1024,
// Maximum number of events the I/O thread can process in one go.
max_io_events = 256,
diff --git a/src/options.cpp b/src/options.cpp
index c13488f..f6b24d6 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -26,7 +26,6 @@
zmq::options_t::options_t () :
hwm (0),
- lwm (0),
swap (0),
affinity (0),
rate (100),
@@ -53,14 +52,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
hwm = *((uint64_t*) optval_);
return 0;
- case ZMQ_LWM:
- if (optvallen_ != sizeof (uint64_t)) {
- errno = EINVAL;
- return -1;
- }
- lwm = *((uint64_t*) optval_);
- return 0;
-
case ZMQ_SWAP:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
@@ -155,15 +146,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (uint64_t);
return 0;
- case ZMQ_LWM:
- if (*optvallen_ < sizeof (uint64_t)) {
- errno = EINVAL;
- return -1;
- }
- *((uint64_t*) optval_) = lwm;
- *optvallen_ = sizeof (uint64_t);
- return 0;
-
case ZMQ_SWAP:
if (*optvallen_ < sizeof (int64_t)) {
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index fd92e3f..908b166 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -35,7 +35,6 @@ namespace zmq
int getsockopt (int option_, void *optval_, size_t *optvallen_);
uint64_t hwm;
- uint64_t lwm;
int64_t swap;
uint64_t affinity;
blob_t identity;
diff --git a/src/pipe.cpp b/src/pipe.cpp
index f592f8c..1df64e9 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -233,9 +233,9 @@ bool zmq::writer_t::pipe_full ()
}
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_, uint64_t lwm_) :
- reader (reader_parent_, hwm_, lwm_),
- writer (writer_parent_, hwm_, lwm_)
+ uint64_t hwm_) :
+ reader (reader_parent_, hwm_, compute_lwm (hwm_)),
+ writer (writer_parent_, hwm_, compute_lwm (hwm_))
{
reader.set_pipe (this);
writer.set_pipe (this);
@@ -250,3 +250,32 @@ zmq::pipe_t::~pipe_t ()
while (read (&msg))
zmq_msg_close (&msg);
}
+
+uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_)
+{
+ // Following point should be taken into consideration when computing
+ // low watermark:
+ //
+ // 1. LWM has to be less than HWM.
+ // 2. LWM cannot be set to very low value (such as zero) as after filling
+ // the queue it would start to refill only after all the messages are
+ // read from it and thus unnecessarily hold the progress back.
+ // 3. LWM cannot be set to very high value (such as HWM-1) as it would
+ // result in lock-step filling of the queue - if a single message is read
+ // from a full queue, writer thread is resumed to write exactly one
+ // message to the queue and go back to sleep immediately. This would
+ // result in low performance.
+ //
+ // Given the 3. it would be good to keep HWM and LWM as far apart as
+ // possible to reduce the thread switching overhead to almost zero,
+ // say HWM-LWM should be 500 (max_wm_delta).
+ //
+ // That done, we still we have to account for the cases where HWM<500 thus
+ // driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases.
+
+ if (hwm_ > max_wm_delta * 2)
+ return hwm_ - max_wm_delta;
+ else
+ return hwm_ / 2;
+}
+
diff --git a/src/pipe.hpp b/src/pipe.hpp
index b0428b5..9f57653 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -150,7 +150,7 @@ namespace zmq
public:
pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_, uint64_t lwm_);
+ uint64_t hwm_);
~pipe_t ();
reader_t reader;
@@ -158,6 +158,8 @@ namespace zmq
private:
+ uint64_t compute_lwm (uint64_t hwm_);
+
pipe_t (const pipe_t&);
void operator = (const pipe_t&);
};
diff --git a/src/session.cpp b/src/session.cpp
index 792d763..3cd27fb 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -265,8 +265,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
writer_t *socket_writer = NULL;
if (options.requires_in && !out_pipe) {
- pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
- options.hwm, options.lwm);
+ pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm);
zmq_assert (pipe);
out_pipe = &pipe->writer;
out_pipe->set_endpoint (this);
@@ -274,8 +273,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
if (options.requires_out && !in_pipe) {
- pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
- options.hwm, options.lwm);
+ pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm);
zmq_assert (pipe);
in_pipe = &pipe->reader;
in_pipe->set_endpoint (this);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index e946526..eddb297 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -194,15 +194,13 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, peer,
- options.hwm, options.lwm);
+ in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (peer, this,
- options.hwm, options.lwm);
+ out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm);
zmq_assert (out_pipe);
}
@@ -235,16 +233,14 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, session,
- options.hwm, options.lwm);
+ in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (session, this,
- options.hwm, options.lwm);
+ out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm);
zmq_assert (out_pipe);
}