diff options
-rw-r--r-- | src/lb.cpp | 9 | ||||
-rw-r--r-- | src/pair.cpp | 5 | ||||
-rw-r--r-- | src/pipe.cpp | 8 | ||||
-rw-r--r-- | src/pipe.hpp | 6 | ||||
-rw-r--r-- | src/swap.cpp | 15 | ||||
-rw-r--r-- | src/swap.hpp | 8 |
6 files changed, 39 insertions, 12 deletions
@@ -128,8 +128,15 @@ bool zmq::lb_t::has_out () return true; while (active > 0) { - if (pipes [current]->check_write ()) + + // Check whether zero-sized message can be written to the pipe. + zmq_msg_t msg; + zmq_msg_init (&msg); + if (pipes [current]->check_write (&msg)) { + zmq_msg_close (&msg); return true; + } + zmq_msg_close (&msg); // Deactivate the pipe. active--; diff --git a/src/pair.cpp b/src/pair.cpp index 6442b32..ace3550 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -170,7 +170,10 @@ bool zmq::pair_t::xhas_out () if (!outpipe || !outpipe_alive) return false; - outpipe_alive = outpipe->check_write (); + zmq_msg_t msg; + zmq_msg_init (&msg); + outpipe_alive = outpipe->check_write (&msg); + zmq_msg_close (&msg); return outpipe_alive; } diff --git a/src/pipe.cpp b/src/pipe.cpp index a60e519..bbdd44e 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -200,15 +200,15 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_) sink = sink_; } -bool zmq::writer_t::check_write () +bool zmq::writer_t::check_write (zmq_msg_t *msg_) { // We've already checked and there's no space free for the new message. // There's no point in checking once again. if (unlikely (!active)) return false; - + if (unlikely (swapping)) { - if (unlikely (swap->full ())) { + if (unlikely (!swap->fits (msg_))) { active = false; return false; } @@ -229,7 +229,7 @@ bool zmq::writer_t::check_write () bool zmq::writer_t::write (zmq_msg_t *msg_) { - if (unlikely (!check_write ())) + if (unlikely (!check_write (msg_))) return false; if (unlikely (swapping)) { diff --git a/src/pipe.hpp b/src/pipe.hpp index 711a295..a956ce3 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -134,10 +134,10 @@ namespace zmq // Specifies the object to get events from the writer. void set_event_sink (i_writer_events *endpoint_); - // Checks whether a message can be written to the pipe. + // Checks whether messages can be written to the pipe. // If writing the message would cause high watermark and (optionally) - // swap to be exceeded, the function returns false. - bool check_write (); + // if the swap is full, the function returns false. + bool check_write (zmq_msg_t *msg_); // Writes a message to the underlying pipe. Returns false if the // message cannot be written because high watermark was reached. diff --git a/src/swap.cpp b/src/swap.cpp index e3cc63e..b1add37 100644 --- a/src/swap.cpp +++ b/src/swap.cpp @@ -189,10 +189,23 @@ bool zmq::swap_t::empty () return read_pos == write_pos; } +/* bool zmq::swap_t::full () { - return buffer_space () == 1; + // Check that at least the message size can be written to the swap. + return buffer_space () < (int64_t) (sizeof (size_t) + 1); } +*/ + +bool zmq::swap_t::fits (zmq_msg_t *msg_) +{ + // Check whether whole binary representation of the message + // fits into the swap. + size_t msg_size = zmq_msg_size (msg_); + if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size)) + return false; + return true; + } void zmq::swap_t::copy_from_file (void *buffer_, size_t count_) { diff --git a/src/swap.hpp b/src/swap.hpp index 6180317..bc62fcd 100644 --- a/src/swap.hpp +++ b/src/swap.hpp @@ -59,8 +59,12 @@ namespace zmq // Returns true if the swap is empty; false otherwise. bool empty (); - // Returns true if and only if the swap is full. - bool full (); + +// // Returns true if and only if the swap is full. +// bool full (); + + // Returns true if the message fits into swap. + bool fits (zmq_msg_t *msg_); private: |