summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/zmq_setsockopt.txt15
-rw-r--r--include/zmq.h1
-rw-r--r--src/config.hpp5
-rw-r--r--src/options.cpp18
-rw-r--r--src/options.hpp1
-rw-r--r--src/pipe.cpp35
-rw-r--r--src/pipe.hpp4
-rw-r--r--src/session.cpp6
-rw-r--r--src/socket_base.cpp12
9 files changed, 46 insertions, 51 deletions
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 92068f8..8845a10 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -40,21 +40,6 @@ Default value:: 0
Applicable socket types:: all
-ZMQ_LWM: Set low water mark
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
-The 'ZMQ_LWM' option shall set the low water mark for the _message queue_
-associated with the socket. This option only makes sense when used in
-conjunction with the 'ZMQ_HWM' option. A socket which has reached it's high
-water mark remains in the "emergency" state until the number of outstanding
-messages in it's associated message queue falls below the low water mark, at
-which point normal message processing is resumed.
-
-Option value type:: int64_t
-Option value unit:: messages
-Default value:: 0
-Applicable socket types:: all
-
-
ZMQ_SWAP: Set disk offload size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SWAP' option shall set the disk offload (swap) size for the _message
diff --git a/include/zmq.h b/include/zmq.h
index 9745adf..6cdf68f 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -169,6 +169,7 @@ ZMQ_EXPORT int zmq_term (void *context);
/* Socket options. */
#define ZMQ_HWM 1
+/* TODO: LWM is obsolete and should be removed in next version. */
#define ZMQ_LWM 2
#define ZMQ_SWAP 3
#define ZMQ_AFFINITY 4
diff --git a/src/config.hpp b/src/config.hpp
index e211f34..2c0ac2d 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -58,7 +58,10 @@ namespace zmq
// So, if there are 10 messages that fit into the batch size, all of
// them may be written by a single 'send' system call, thus avoiding
// unnecessary network stack traversals.
- out_batch_size = 8192,
+ out_batch_size = 8192,
+
+ // Maximal delta between high and low watermark.
+ max_wm_delta = 1024,
// Maximum number of events the I/O thread can process in one go.
max_io_events = 256,
diff --git a/src/options.cpp b/src/options.cpp
index c13488f..f6b24d6 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -26,7 +26,6 @@
zmq::options_t::options_t () :
hwm (0),
- lwm (0),
swap (0),
affinity (0),
rate (100),
@@ -53,14 +52,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
hwm = *((uint64_t*) optval_);
return 0;
- case ZMQ_LWM:
- if (optvallen_ != sizeof (uint64_t)) {
- errno = EINVAL;
- return -1;
- }
- lwm = *((uint64_t*) optval_);
- return 0;
-
case ZMQ_SWAP:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
@@ -155,15 +146,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (uint64_t);
return 0;
- case ZMQ_LWM:
- if (*optvallen_ < sizeof (uint64_t)) {
- errno = EINVAL;
- return -1;
- }
- *((uint64_t*) optval_) = lwm;
- *optvallen_ = sizeof (uint64_t);
- return 0;
-
case ZMQ_SWAP:
if (*optvallen_ < sizeof (int64_t)) {
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index fd92e3f..908b166 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -35,7 +35,6 @@ namespace zmq
int getsockopt (int option_, void *optval_, size_t *optvallen_);
uint64_t hwm;
- uint64_t lwm;
int64_t swap;
uint64_t affinity;
blob_t identity;
diff --git a/src/pipe.cpp b/src/pipe.cpp
index f592f8c..1df64e9 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -233,9 +233,9 @@ bool zmq::writer_t::pipe_full ()
}
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_, uint64_t lwm_) :
- reader (reader_parent_, hwm_, lwm_),
- writer (writer_parent_, hwm_, lwm_)
+ uint64_t hwm_) :
+ reader (reader_parent_, hwm_, compute_lwm (hwm_)),
+ writer (writer_parent_, hwm_, compute_lwm (hwm_))
{
reader.set_pipe (this);
writer.set_pipe (this);
@@ -250,3 +250,32 @@ zmq::pipe_t::~pipe_t ()
while (read (&msg))
zmq_msg_close (&msg);
}
+
+uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_)
+{
+ // Following point should be taken into consideration when computing
+ // low watermark:
+ //
+ // 1. LWM has to be less than HWM.
+ // 2. LWM cannot be set to very low value (such as zero) as after filling
+ // the queue it would start to refill only after all the messages are
+ // read from it and thus unnecessarily hold the progress back.
+ // 3. LWM cannot be set to very high value (such as HWM-1) as it would
+ // result in lock-step filling of the queue - if a single message is read
+ // from a full queue, writer thread is resumed to write exactly one
+ // message to the queue and go back to sleep immediately. This would
+ // result in low performance.
+ //
+ // Given the 3. it would be good to keep HWM and LWM as far apart as
+ // possible to reduce the thread switching overhead to almost zero,
+ // say HWM-LWM should be 500 (max_wm_delta).
+ //
+ // That done, we still we have to account for the cases where HWM<500 thus
+ // driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases.
+
+ if (hwm_ > max_wm_delta * 2)
+ return hwm_ - max_wm_delta;
+ else
+ return hwm_ / 2;
+}
+
diff --git a/src/pipe.hpp b/src/pipe.hpp
index b0428b5..9f57653 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -150,7 +150,7 @@ namespace zmq
public:
pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_, uint64_t lwm_);
+ uint64_t hwm_);
~pipe_t ();
reader_t reader;
@@ -158,6 +158,8 @@ namespace zmq
private:
+ uint64_t compute_lwm (uint64_t hwm_);
+
pipe_t (const pipe_t&);
void operator = (const pipe_t&);
};
diff --git a/src/session.cpp b/src/session.cpp
index 792d763..3cd27fb 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -265,8 +265,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
writer_t *socket_writer = NULL;
if (options.requires_in && !out_pipe) {
- pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
- options.hwm, options.lwm);
+ pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm);
zmq_assert (pipe);
out_pipe = &pipe->writer;
out_pipe->set_endpoint (this);
@@ -274,8 +273,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
if (options.requires_out && !in_pipe) {
- pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
- options.hwm, options.lwm);
+ pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm);
zmq_assert (pipe);
in_pipe = &pipe->reader;
in_pipe->set_endpoint (this);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index e946526..eddb297 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -194,15 +194,13 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, peer,
- options.hwm, options.lwm);
+ in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (peer, this,
- options.hwm, options.lwm);
+ out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm);
zmq_assert (out_pipe);
}
@@ -235,16 +233,14 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, session,
- options.hwm, options.lwm);
+ in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (session, this,
- options.hwm, options.lwm);
+ out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm);
zmq_assert (out_pipe);
}