diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-05-25 15:03:57 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-05-25 15:03:57 +0200 |
commit | 8408ae066dce123fc93e4f53dbadb1f60b7f2e8a (patch) | |
tree | 414194ee2bb2cf5eb0937ffb872e27c5e8656e03 /src | |
parent | f34a468a263c7b4013a267297ee7f121e12dfb9d (diff) |
LWM is computed rather than explicitly specified by user
Diffstat (limited to 'src')
-rw-r--r-- | src/config.hpp | 5 | ||||
-rw-r--r-- | src/options.cpp | 18 | ||||
-rw-r--r-- | src/options.hpp | 1 | ||||
-rw-r--r-- | src/pipe.cpp | 35 | ||||
-rw-r--r-- | src/pipe.hpp | 4 | ||||
-rw-r--r-- | src/session.cpp | 6 | ||||
-rw-r--r-- | src/socket_base.cpp | 12 |
7 files changed, 45 insertions, 36 deletions
diff --git a/src/config.hpp b/src/config.hpp index e211f34..2c0ac2d 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -58,7 +58,10 @@ namespace zmq // So, if there are 10 messages that fit into the batch size, all of // them may be written by a single 'send' system call, thus avoiding // unnecessary network stack traversals. - out_batch_size = 8192, + out_batch_size = 8192, + + // Maximal delta between high and low watermark. + max_wm_delta = 1024, // Maximum number of events the I/O thread can process in one go. max_io_events = 256, diff --git a/src/options.cpp b/src/options.cpp index c13488f..f6b24d6 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -26,7 +26,6 @@ zmq::options_t::options_t () : hwm (0), - lwm (0), swap (0), affinity (0), rate (100), @@ -53,14 +52,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, hwm = *((uint64_t*) optval_); return 0; - case ZMQ_LWM: - if (optvallen_ != sizeof (uint64_t)) { - errno = EINVAL; - return -1; - } - lwm = *((uint64_t*) optval_); - return 0; - case ZMQ_SWAP: if (optvallen_ != sizeof (int64_t)) { errno = EINVAL; @@ -155,15 +146,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (uint64_t); return 0; - case ZMQ_LWM: - if (*optvallen_ < sizeof (uint64_t)) { - errno = EINVAL; - return -1; - } - *((uint64_t*) optval_) = lwm; - *optvallen_ = sizeof (uint64_t); - return 0; - case ZMQ_SWAP: if (*optvallen_ < sizeof (int64_t)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index fd92e3f..908b166 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -35,7 +35,6 @@ namespace zmq int getsockopt (int option_, void *optval_, size_t *optvallen_); uint64_t hwm; - uint64_t lwm; int64_t swap; uint64_t affinity; blob_t identity; diff --git a/src/pipe.cpp b/src/pipe.cpp index f592f8c..1df64e9 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -233,9 +233,9 @@ bool zmq::writer_t::pipe_full () } zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, uint64_t lwm_) : - reader (reader_parent_, hwm_, lwm_), - writer (writer_parent_, hwm_, lwm_) + uint64_t hwm_) : + reader (reader_parent_, hwm_, compute_lwm (hwm_)), + writer (writer_parent_, hwm_, compute_lwm (hwm_)) { reader.set_pipe (this); writer.set_pipe (this); @@ -250,3 +250,32 @@ zmq::pipe_t::~pipe_t () while (read (&msg)) zmq_msg_close (&msg); } + +uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_) +{ + // Following point should be taken into consideration when computing + // low watermark: + // + // 1. LWM has to be less than HWM. + // 2. LWM cannot be set to very low value (such as zero) as after filling + // the queue it would start to refill only after all the messages are + // read from it and thus unnecessarily hold the progress back. + // 3. LWM cannot be set to very high value (such as HWM-1) as it would + // result in lock-step filling of the queue - if a single message is read + // from a full queue, writer thread is resumed to write exactly one + // message to the queue and go back to sleep immediately. This would + // result in low performance. + // + // Given the 3. it would be good to keep HWM and LWM as far apart as + // possible to reduce the thread switching overhead to almost zero, + // say HWM-LWM should be 500 (max_wm_delta). + // + // That done, we still we have to account for the cases where HWM<500 thus + // driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases. + + if (hwm_ > max_wm_delta * 2) + return hwm_ - max_wm_delta; + else + return hwm_ / 2; +} + diff --git a/src/pipe.hpp b/src/pipe.hpp index b0428b5..9f57653 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -150,7 +150,7 @@ namespace zmq public: pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, uint64_t lwm_); + uint64_t hwm_); ~pipe_t (); reader_t reader; @@ -158,6 +158,8 @@ namespace zmq private: + uint64_t compute_lwm (uint64_t hwm_); + pipe_t (const pipe_t&); void operator = (const pipe_t&); }; diff --git a/src/session.cpp b/src/session.cpp index 792d763..3cd27fb 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -265,8 +265,7 @@ void zmq::session_t::process_attach (i_engine *engine_, writer_t *socket_writer = NULL; if (options.requires_in && !out_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, - options.hwm, options.lwm); + pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm); zmq_assert (pipe); out_pipe = &pipe->writer; out_pipe->set_endpoint (this); @@ -274,8 +273,7 @@ void zmq::session_t::process_attach (i_engine *engine_, } if (options.requires_out && !in_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, - options.hwm, options.lwm); + pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm); zmq_assert (pipe); in_pipe = &pipe->reader; in_pipe->set_endpoint (this); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e946526..eddb297 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -194,15 +194,13 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, peer, - options.hwm, options.lwm); + in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm); zmq_assert (in_pipe); } // Create outbound pipe, if required. if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (peer, this, - options.hwm, options.lwm); + out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm); zmq_assert (out_pipe); } @@ -235,16 +233,14 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, - options.hwm, options.lwm); + in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm); zmq_assert (in_pipe); } // Create outbound pipe, if required. if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, - options.hwm, options.lwm); + out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm); zmq_assert (out_pipe); } |