diff options
-rw-r--r-- | doc/zmq_getsockopt.txt | 4 | ||||
-rw-r--r-- | doc/zmq_setsockopt.txt | 2 | ||||
-rw-r--r-- | src/options.cpp | 8 | ||||
-rw-r--r-- | src/options.hpp | 4 | ||||
-rw-r--r-- | src/pipe.cpp | 11 | ||||
-rw-r--r-- | src/pipe.hpp | 17 | ||||
-rw-r--r-- | src/socket_base.cpp | 2 | ||||
-rw-r--r-- | tests/test_hwm.cpp | 3 |
8 files changed, 25 insertions, 26 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 88a4071..e23eaac 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -73,7 +73,7 @@ type. The default 'ZMQ_HWM' value of zero means "no limit". [horizontal] -Option value type:: uint64_t +Option value type:: int Option value unit:: messages Default value:: 0 Applicable socket types:: all @@ -348,7 +348,7 @@ EXAMPLE .Retrieving the high water mark ---- /* Retrieve high water mark into hwm */ -int64_t hwm; +int hwm; size_t hwm_size = sizeof (hwm); rc = zmq_getsockopt (socket, ZMQ_HWM, &hwm, &hwm_size); assert (rc == 0); diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index f486305..b16bab4 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -41,7 +41,7 @@ type. The default 'ZMQ_HWM' value of zero means "no limit". [horizontal] -Option value type:: uint64_t +Option value type:: int Option value unit:: messages Default value:: 0 Applicable socket types:: all diff --git a/src/options.cpp b/src/options.cpp index 13332da..39f8984 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -50,11 +50,11 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, switch (option_) { case ZMQ_HWM: - if (optvallen_ != sizeof (uint64_t)) { + if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) { errno = EINVAL; return -1; } - hwm = *((uint64_t*) optval_); + hwm = *((int*) optval_); return 0; case ZMQ_AFFINITY: @@ -169,11 +169,11 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) switch (option_) { case ZMQ_HWM: - if (*optvallen_ < sizeof (uint64_t)) { + if (*optvallen_ < sizeof (int)) { errno = EINVAL; return -1; } - *((uint64_t*) optval_) = hwm; + *((int*) optval_) = hwm; *optvallen_ = sizeof (uint64_t); return 0; diff --git a/src/options.hpp b/src/options.hpp index d039554..971e643 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -35,7 +35,9 @@ namespace zmq int setsockopt (int option_, const void *optval_, size_t optvallen_); int getsockopt (int option_, void *optval_, size_t *optvallen_); - uint64_t hwm; + // High-water mark for messages in pipe. + int hwm; + uint64_t affinity; blob_t identity; diff --git a/src/pipe.cpp b/src/pipe.cpp index f09dea4..2af2dc2 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -25,8 +25,7 @@ #include "pipe.hpp" #include "likely.hpp" -zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, - uint64_t lwm_) : +zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, int lwm_) : object_t (parent_), active (true), pipe (pipe_), @@ -163,7 +162,7 @@ void zmq::reader_t::process_pipe_term_ack () } zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, - uint64_t hwm_) : + int hwm_) : object_t (parent_), active (true), pipe (pipe_), @@ -288,11 +287,11 @@ void zmq::writer_t::process_pipe_term () bool zmq::writer_t::pipe_full () { - return hwm > 0 && msgs_written - msgs_read == hwm; + return hwm > 0 && msgs_written - msgs_read == uint64_t (hwm); } void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, reader_t **reader_, writer_t **writer_) + int hwm_, reader_t **reader_, writer_t **writer_) { // First compute the low water mark. Following point should be taken // into consideration: @@ -314,7 +313,7 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, // That done, we still we have to account for the cases where // HWM < max_wm_delta thus driving LWM to negative numbers. // Let's make LWM 1/2 of HWM in such cases. - uint64_t lwm = (hwm_ > max_wm_delta * 2) ? + int lwm = (hwm_ > max_wm_delta * 2) ? hwm_ - max_wm_delta : (hwm_ + 1) / 2; // Create all three objects pipe consists of: the pipe per se, reader and diff --git a/src/pipe.hpp b/src/pipe.hpp index ed13478..3230d02 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -23,19 +23,18 @@ #include "../include/zmq.h" -#include "stdint.hpp" #include "array.hpp" #include "ypipe.hpp" #include "config.hpp" #include "object.hpp" +#include "stdint.hpp" namespace zmq { // Creates a pipe. Returns pointer to reader and writer objects. void create_pipe (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, class reader_t **reader_, - class writer_t **writer_); + int hwm_, class reader_t **reader_, class writer_t **writer_); // The shutdown mechanism for pipe works as follows: Either endpoint // (or even both of them) can ask pipe to terminate by calling 'terminate' @@ -57,7 +56,7 @@ namespace zmq class reader_t : public object_t, public array_item_t { - friend void create_pipe (object_t*, object_t*, uint64_t, + friend void create_pipe (object_t*, object_t*, int, reader_t**, writer_t**); friend class writer_t; @@ -77,7 +76,7 @@ namespace zmq private: - reader_t (class object_t *parent_, pipe_t *pipe_, uint64_t lwm_); + reader_t (class object_t *parent_, pipe_t *pipe_, int lwm_); ~reader_t (); // To be called only by writer itself! @@ -100,7 +99,7 @@ namespace zmq class writer_t *writer; // Low watermark for in-memory storage (in bytes). - uint64_t lwm; + int lwm; // Number of messages read so far. uint64_t msgs_read; @@ -126,7 +125,7 @@ namespace zmq class writer_t : public object_t, public array_item_t { - friend void create_pipe (object_t*, object_t*, uint64_t, + friend void create_pipe (object_t*, object_t*, int, reader_t**, writer_t**); public: @@ -155,7 +154,7 @@ namespace zmq private: writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_, - uint64_t hwm_); + int hwm_); ~writer_t (); // Command handlers. @@ -175,7 +174,7 @@ namespace zmq reader_t *reader; // High watermark for in-memory storage (in bytes). - uint64_t hwm; + int hwm; // Last confirmed number of messages read from the pipe. // The actual number can be higher. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 351e3c5..374e342 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -377,7 +377,7 @@ int zmq::socket_base_t::connect (const char *addr_) // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. - int64_t hwm; + int hwm; if (options.hwm == 0 || peer.options.hwm == 0) hwm = 0; else diff --git a/tests/test_hwm.cpp b/tests/test_hwm.cpp index d579f9d..a96193d 100644 --- a/tests/test_hwm.cpp +++ b/tests/test_hwm.cpp @@ -21,7 +21,6 @@ #include <assert.h> -#include "../src/stdint.hpp" #include "testutil.hpp" int main (int argc, char *argv []) @@ -33,7 +32,7 @@ int main (int argc, char *argv []) // buffer space should be 4 messages. void *sb = zmq_socket (ctx, ZMQ_PULL); assert (sb); - uint64_t hwm = 2; + int hwm = 2; int rc = zmq_setsockopt (sb, ZMQ_HWM, &hwm, sizeof (hwm)); assert (rc == 0); rc = zmq_bind (sb, "inproc://a"); |