summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/encoder.hpp12
-rw-r--r--src/stream_engine.cpp9
2 files changed, 16 insertions, 5 deletions
diff --git a/src/encoder.hpp b/src/encoder.hpp
index 93efbb8..d5ea467 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -63,7 +63,11 @@ namespace xs
// If offset is not NULL, it is filled by offset of the first message
// in the batch.If there's no beginning of a message in the batch,
// offset is set to -1.
- inline void get_data (unsigned char **data_, size_t *size_,
+ // If data is returned and return value is false, there are no more
+ // data available to get from the encoder the next time. Therefore,
+ // an optimisation can be made and the engine can stop polling for
+ // POLLOUT immediately.
+ inline bool get_data (unsigned char **data_, size_t *size_,
int *offset_ = NULL)
{
unsigned char *buffer = !*data_ ? buf : *data_;
@@ -82,7 +86,7 @@ namespace xs
if (!(static_cast <T*> (this)->*next) ()) {
*data_ = buffer;
*size_ = pos;
- return;
+ return false;
}
// If beginning of the message was processed, adjust the
@@ -109,7 +113,7 @@ namespace xs
*size_ = to_write;
write_pos = NULL;
to_write = 0;
- return;
+ return true;
}
// Copy data to the buffer. If the buffer is full, return.
@@ -121,7 +125,7 @@ namespace xs
if (pos == buffersize) {
*data_ = buffer;
*size_ = pos;
- return;
+ return true;
}
}
}
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 0034737..0ae96d2 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -208,11 +208,13 @@ void xs::stream_engine_t::in_event ()
void xs::stream_engine_t::out_event ()
{
+ bool more_data = true;
+
// If write buffer is empty, try to read new data from the encoder.
if (!outsize) {
outpos = NULL;
- encoder.get_data (&outpos, &outsize);
+ more_data = encoder.get_data (&outpos, &outsize);
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
@@ -243,6 +245,11 @@ void xs::stream_engine_t::out_event ()
outpos += nbytes;
outsize -= nbytes;
+
+ // If the encoder reports that there are no more data to get from it
+ // we can stop polling for POLLOUT immediately.
+ if (!more_data)
+ reset_pollout (handle);
}
void xs::stream_engine_t::activate_out ()