diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/options.cpp | 8 | ||||
-rw-r--r-- | src/options.hpp | 4 | ||||
-rw-r--r-- | src/pipe.cpp | 11 | ||||
-rw-r--r-- | src/pipe.hpp | 17 | ||||
-rw-r--r-- | src/socket_base.cpp | 2 |
5 files changed, 21 insertions, 21 deletions
diff --git a/src/options.cpp b/src/options.cpp index 13332da..39f8984 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -50,11 +50,11 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, switch (option_) { case ZMQ_HWM: - if (optvallen_ != sizeof (uint64_t)) { + if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) { errno = EINVAL; return -1; } - hwm = *((uint64_t*) optval_); + hwm = *((int*) optval_); return 0; case ZMQ_AFFINITY: @@ -169,11 +169,11 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) switch (option_) { case ZMQ_HWM: - if (*optvallen_ < sizeof (uint64_t)) { + if (*optvallen_ < sizeof (int)) { errno = EINVAL; return -1; } - *((uint64_t*) optval_) = hwm; + *((int*) optval_) = hwm; *optvallen_ = sizeof (uint64_t); return 0; diff --git a/src/options.hpp b/src/options.hpp index d039554..971e643 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -35,7 +35,9 @@ namespace zmq int setsockopt (int option_, const void *optval_, size_t optvallen_); int getsockopt (int option_, void *optval_, size_t *optvallen_); - uint64_t hwm; + // High-water mark for messages in pipe. + int hwm; + uint64_t affinity; blob_t identity; diff --git a/src/pipe.cpp b/src/pipe.cpp index f09dea4..2af2dc2 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -25,8 +25,7 @@ #include "pipe.hpp" #include "likely.hpp" -zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, - uint64_t lwm_) : +zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, int lwm_) : object_t (parent_), active (true), pipe (pipe_), @@ -163,7 +162,7 @@ void zmq::reader_t::process_pipe_term_ack () } zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, - uint64_t hwm_) : + int hwm_) : object_t (parent_), active (true), pipe (pipe_), @@ -288,11 +287,11 @@ void zmq::writer_t::process_pipe_term () bool zmq::writer_t::pipe_full () { - return hwm > 0 && msgs_written - msgs_read == hwm; + return hwm > 0 && msgs_written - msgs_read == uint64_t (hwm); } void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, reader_t **reader_, writer_t **writer_) + int hwm_, reader_t **reader_, writer_t **writer_) { // First compute the low water mark. Following point should be taken // into consideration: @@ -314,7 +313,7 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, // That done, we still we have to account for the cases where // HWM < max_wm_delta thus driving LWM to negative numbers. // Let's make LWM 1/2 of HWM in such cases. - uint64_t lwm = (hwm_ > max_wm_delta * 2) ? + int lwm = (hwm_ > max_wm_delta * 2) ? hwm_ - max_wm_delta : (hwm_ + 1) / 2; // Create all three objects pipe consists of: the pipe per se, reader and diff --git a/src/pipe.hpp b/src/pipe.hpp index ed13478..3230d02 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -23,19 +23,18 @@ #include "../include/zmq.h" -#include "stdint.hpp" #include "array.hpp" #include "ypipe.hpp" #include "config.hpp" #include "object.hpp" +#include "stdint.hpp" namespace zmq { // Creates a pipe. Returns pointer to reader and writer objects. void create_pipe (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, class reader_t **reader_, - class writer_t **writer_); + int hwm_, class reader_t **reader_, class writer_t **writer_); // The shutdown mechanism for pipe works as follows: Either endpoint // (or even both of them) can ask pipe to terminate by calling 'terminate' @@ -57,7 +56,7 @@ namespace zmq class reader_t : public object_t, public array_item_t { - friend void create_pipe (object_t*, object_t*, uint64_t, + friend void create_pipe (object_t*, object_t*, int, reader_t**, writer_t**); friend class writer_t; @@ -77,7 +76,7 @@ namespace zmq private: - reader_t (class object_t *parent_, pipe_t *pipe_, uint64_t lwm_); + reader_t (class object_t *parent_, pipe_t *pipe_, int lwm_); ~reader_t (); // To be called only by writer itself! @@ -100,7 +99,7 @@ namespace zmq class writer_t *writer; // Low watermark for in-memory storage (in bytes). - uint64_t lwm; + int lwm; // Number of messages read so far. uint64_t msgs_read; @@ -126,7 +125,7 @@ namespace zmq class writer_t : public object_t, public array_item_t { - friend void create_pipe (object_t*, object_t*, uint64_t, + friend void create_pipe (object_t*, object_t*, int, reader_t**, writer_t**); public: @@ -155,7 +154,7 @@ namespace zmq private: writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_, - uint64_t hwm_); + int hwm_); ~writer_t (); // Command handlers. @@ -175,7 +174,7 @@ namespace zmq reader_t *reader; // High watermark for in-memory storage (in bytes). - uint64_t hwm; + int hwm; // Last confirmed number of messages read from the pipe. // The actual number can be higher. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 351e3c5..374e342 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -377,7 +377,7 @@ int zmq::socket_base_t::connect (const char *addr_) // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. - int64_t hwm; + int hwm; if (options.hwm == 0 || peer.options.hwm == 0) hwm = 0; else |