From 8408ae066dce123fc93e4f53dbadb1f60b7f2e8a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 25 May 2010 15:03:57 +0200 Subject: LWM is computed rather than explicitly specified by user --- doc/zmq_setsockopt.txt | 15 --------------- include/zmq.h | 1 + src/config.hpp | 5 ++++- src/options.cpp | 18 ------------------ src/options.hpp | 1 - src/pipe.cpp | 35 ++++++++++++++++++++++++++++++++--- src/pipe.hpp | 4 +++- src/session.cpp | 6 ++---- src/socket_base.cpp | 12 ++++-------- 9 files changed, 46 insertions(+), 51 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 92068f8..8845a10 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -40,21 +40,6 @@ Default value:: 0 Applicable socket types:: all -ZMQ_LWM: Set low water mark -~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_LWM' option shall set the low water mark for the _message queue_ -associated with the socket. This option only makes sense when used in -conjunction with the 'ZMQ_HWM' option. A socket which has reached it's high -water mark remains in the "emergency" state until the number of outstanding -messages in it's associated message queue falls below the low water mark, at -which point normal message processing is resumed. - -Option value type:: int64_t -Option value unit:: messages -Default value:: 0 -Applicable socket types:: all - - ZMQ_SWAP: Set disk offload size ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_SWAP' option shall set the disk offload (swap) size for the _message diff --git a/include/zmq.h b/include/zmq.h index 9745adf..6cdf68f 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -169,6 +169,7 @@ ZMQ_EXPORT int zmq_term (void *context); /* Socket options. */ #define ZMQ_HWM 1 +/* TODO: LWM is obsolete and should be removed in next version. */ #define ZMQ_LWM 2 #define ZMQ_SWAP 3 #define ZMQ_AFFINITY 4 diff --git a/src/config.hpp b/src/config.hpp index e211f34..2c0ac2d 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -58,7 +58,10 @@ namespace zmq // So, if there are 10 messages that fit into the batch size, all of // them may be written by a single 'send' system call, thus avoiding // unnecessary network stack traversals. - out_batch_size = 8192, + out_batch_size = 8192, + + // Maximal delta between high and low watermark. + max_wm_delta = 1024, // Maximum number of events the I/O thread can process in one go. max_io_events = 256, diff --git a/src/options.cpp b/src/options.cpp index c13488f..f6b24d6 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -26,7 +26,6 @@ zmq::options_t::options_t () : hwm (0), - lwm (0), swap (0), affinity (0), rate (100), @@ -53,14 +52,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, hwm = *((uint64_t*) optval_); return 0; - case ZMQ_LWM: - if (optvallen_ != sizeof (uint64_t)) { - errno = EINVAL; - return -1; - } - lwm = *((uint64_t*) optval_); - return 0; - case ZMQ_SWAP: if (optvallen_ != sizeof (int64_t)) { errno = EINVAL; @@ -155,15 +146,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (uint64_t); return 0; - case ZMQ_LWM: - if (*optvallen_ < sizeof (uint64_t)) { - errno = EINVAL; - return -1; - } - *((uint64_t*) optval_) = lwm; - *optvallen_ = sizeof (uint64_t); - return 0; - case ZMQ_SWAP: if (*optvallen_ < sizeof (int64_t)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index fd92e3f..908b166 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -35,7 +35,6 @@ namespace zmq int getsockopt (int option_, void *optval_, size_t *optvallen_); uint64_t hwm; - uint64_t lwm; int64_t swap; uint64_t affinity; blob_t identity; diff --git a/src/pipe.cpp b/src/pipe.cpp index f592f8c..1df64e9 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -233,9 +233,9 @@ bool zmq::writer_t::pipe_full () } zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, uint64_t lwm_) : - reader (reader_parent_, hwm_, lwm_), - writer (writer_parent_, hwm_, lwm_) + uint64_t hwm_) : + reader (reader_parent_, hwm_, compute_lwm (hwm_)), + writer (writer_parent_, hwm_, compute_lwm (hwm_)) { reader.set_pipe (this); writer.set_pipe (this); @@ -250,3 +250,32 @@ zmq::pipe_t::~pipe_t () while (read (&msg)) zmq_msg_close (&msg); } + +uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_) +{ + // Following point should be taken into consideration when computing + // low watermark: + // + // 1. LWM has to be less than HWM. + // 2. LWM cannot be set to very low value (such as zero) as after filling + // the queue it would start to refill only after all the messages are + // read from it and thus unnecessarily hold the progress back. + // 3. LWM cannot be set to very high value (such as HWM-1) as it would + // result in lock-step filling of the queue - if a single message is read + // from a full queue, writer thread is resumed to write exactly one + // message to the queue and go back to sleep immediately. This would + // result in low performance. + // + // Given the 3. it would be good to keep HWM and LWM as far apart as + // possible to reduce the thread switching overhead to almost zero, + // say HWM-LWM should be 500 (max_wm_delta). + // + // That done, we still we have to account for the cases where HWM<500 thus + // driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases. + + if (hwm_ > max_wm_delta * 2) + return hwm_ - max_wm_delta; + else + return hwm_ / 2; +} + diff --git a/src/pipe.hpp b/src/pipe.hpp index b0428b5..9f57653 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -150,7 +150,7 @@ namespace zmq public: pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, uint64_t lwm_); + uint64_t hwm_); ~pipe_t (); reader_t reader; @@ -158,6 +158,8 @@ namespace zmq private: + uint64_t compute_lwm (uint64_t hwm_); + pipe_t (const pipe_t&); void operator = (const pipe_t&); }; diff --git a/src/session.cpp b/src/session.cpp index 792d763..3cd27fb 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -265,8 +265,7 @@ void zmq::session_t::process_attach (i_engine *engine_, writer_t *socket_writer = NULL; if (options.requires_in && !out_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, - options.hwm, options.lwm); + pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm); zmq_assert (pipe); out_pipe = &pipe->writer; out_pipe->set_endpoint (this); @@ -274,8 +273,7 @@ void zmq::session_t::process_attach (i_engine *engine_, } if (options.requires_out && !in_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, - options.hwm, options.lwm); + pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm); zmq_assert (pipe); in_pipe = &pipe->reader; in_pipe->set_endpoint (this); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e946526..eddb297 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -194,15 +194,13 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, peer, - options.hwm, options.lwm); + in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm); zmq_assert (in_pipe); } // Create outbound pipe, if required. if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (peer, this, - options.hwm, options.lwm); + out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm); zmq_assert (out_pipe); } @@ -235,16 +233,14 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, - options.hwm, options.lwm); + in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm); zmq_assert (in_pipe); } // Create outbound pipe, if required. if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, - options.hwm, options.lwm); + out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm); zmq_assert (out_pipe); } -- cgit v1.2.3