summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lb.cpp9
-rw-r--r--src/pair.cpp5
-rw-r--r--src/pipe.cpp8
-rw-r--r--src/pipe.hpp6
-rw-r--r--src/swap.cpp15
-rw-r--r--src/swap.hpp8
6 files changed, 39 insertions, 12 deletions
diff --git a/src/lb.cpp b/src/lb.cpp
index a784013..5e29ef4 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -128,8 +128,15 @@ bool zmq::lb_t::has_out ()
return true;
while (active > 0) {
- if (pipes [current]->check_write ())
+
+ // Check whether zero-sized message can be written to the pipe.
+ zmq_msg_t msg;
+ zmq_msg_init (&msg);
+ if (pipes [current]->check_write (&msg)) {
+ zmq_msg_close (&msg);
return true;
+ }
+ zmq_msg_close (&msg);
// Deactivate the pipe.
active--;
diff --git a/src/pair.cpp b/src/pair.cpp
index 6442b32..ace3550 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -170,7 +170,10 @@ bool zmq::pair_t::xhas_out ()
if (!outpipe || !outpipe_alive)
return false;
- outpipe_alive = outpipe->check_write ();
+ zmq_msg_t msg;
+ zmq_msg_init (&msg);
+ outpipe_alive = outpipe->check_write (&msg);
+ zmq_msg_close (&msg);
return outpipe_alive;
}
diff --git a/src/pipe.cpp b/src/pipe.cpp
index a60e519..bbdd44e 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -200,15 +200,15 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_)
sink = sink_;
}
-bool zmq::writer_t::check_write ()
+bool zmq::writer_t::check_write (zmq_msg_t *msg_)
{
// We've already checked and there's no space free for the new message.
// There's no point in checking once again.
if (unlikely (!active))
return false;
-
+
if (unlikely (swapping)) {
- if (unlikely (swap->full ())) {
+ if (unlikely (!swap->fits (msg_))) {
active = false;
return false;
}
@@ -229,7 +229,7 @@ bool zmq::writer_t::check_write ()
bool zmq::writer_t::write (zmq_msg_t *msg_)
{
- if (unlikely (!check_write ()))
+ if (unlikely (!check_write (msg_)))
return false;
if (unlikely (swapping)) {
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 711a295..a956ce3 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -134,10 +134,10 @@ namespace zmq
// Specifies the object to get events from the writer.
void set_event_sink (i_writer_events *endpoint_);
- // Checks whether a message can be written to the pipe.
+ // Checks whether messages can be written to the pipe.
// If writing the message would cause high watermark and (optionally)
- // swap to be exceeded, the function returns false.
- bool check_write ();
+ // if the swap is full, the function returns false.
+ bool check_write (zmq_msg_t *msg_);
// Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached.
diff --git a/src/swap.cpp b/src/swap.cpp
index e3cc63e..b1add37 100644
--- a/src/swap.cpp
+++ b/src/swap.cpp
@@ -189,10 +189,23 @@ bool zmq::swap_t::empty ()
return read_pos == write_pos;
}
+/*
bool zmq::swap_t::full ()
{
- return buffer_space () == 1;
+ // Check that at least the message size can be written to the swap.
+ return buffer_space () < (int64_t) (sizeof (size_t) + 1);
}
+*/
+
+bool zmq::swap_t::fits (zmq_msg_t *msg_)
+{
+ // Check whether whole binary representation of the message
+ // fits into the swap.
+ size_t msg_size = zmq_msg_size (msg_);
+ if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size))
+ return false;
+ return true;
+ }
void zmq::swap_t::copy_from_file (void *buffer_, size_t count_)
{
diff --git a/src/swap.hpp b/src/swap.hpp
index 6180317..bc62fcd 100644
--- a/src/swap.hpp
+++ b/src/swap.hpp
@@ -59,8 +59,12 @@ namespace zmq
// Returns true if the swap is empty; false otherwise.
bool empty ();
- // Returns true if and only if the swap is full.
- bool full ();
+
+// // Returns true if and only if the swap is full.
+// bool full ();
+
+ // Returns true if the message fits into swap.
+ bool fits (zmq_msg_t *msg_);
private: