From d5670f34baa0751a5b4567a28caea4e4fa208727 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 11 Dec 2009 22:29:04 +0100 Subject: ZMQII-26: Use zero-copy for large messages (rx side) --- src/encoder.hpp | 107 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 63 insertions(+), 44 deletions(-) (limited to 'src/encoder.hpp') diff --git a/src/encoder.hpp b/src/encoder.hpp index c41180f..35c63b0 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -22,8 +22,11 @@ #include #include +#include #include +#include "err.hpp" + namespace zmq { @@ -35,68 +38,81 @@ namespace zmq { public: - inline encoder_t () + inline encoder_t (size_t bufsize_) : + bufsize (bufsize_) { + buf = (unsigned char*) malloc (bufsize_); + zmq_assert (buf); } - // The function tries to fill the supplied chunk by binary data. - // If offset is not NULL, it is filled by offset of the first message - // in the batch. If there's no beginning of a message in the batch, - // offset is set to -1. Both data_ and size_ are in/out parameters. - // Upon exit, data_ contains actual position of the data read (may - // be different from the position requested) and size_ contains number - // of bytes actually provided. - inline void read (unsigned char **data_, size_t *size_, + inline ~encoder_t () + { + free (buf); + } + + // The function returns a batch of binary data. If offset is not NULL, + // it is filled by offset of the first message in the batch. If there's + // no beginning of a message in the batch, offset is set to -1. + inline void get_buffer (unsigned char **data_, size_t *size_, int *offset_ = NULL) { - int offset = -1; size_t pos = 0; + if (offset_) + *offset_ = -1; + + while (true) { + + // If there are no more data to return, run the state machine. + // If there are still no data, return what we already have + // in the buffer. + if (!to_write) { + if (!(static_cast (this)->*next) ()) { + *data_ = buf; + *size_ = pos; + return; + } - while (pos < *size_) { + // If beginning of the message was processed, adjust the + // first-message-offset. + if (beginning) { + if (offset_ && *offset_ == -1) + *offset_ = pos; + beginning = false; + } + } - // If we are able to fill whole buffer in a single go, let's - // use zero-copy. There's no disadvantage to it as we cannot - // stuck multiple messages into the buffer anyway. Note that - // subsequent write(s) are non-blocking, thus each single - // write writes at most SO_SNDBUF bytes at once not depending - // on how large is the chunk returned from here. + // If there are no data in the buffer yet and we are able to + // fill whole buffer in a single go, let's use zero-copy. + // There's no disadvantage to it as we cannot stuck multiple + // messages into the buffer anyway. Note that subsequent + // write(s) are non-blocking, thus each single write writes + // at most SO_SNDBUF bytes at once not depending on how large + // is the chunk returned from here. // As a consequence, large messages being sent won't block // other engines running in the same I/O thread for excessive // amounts of time. - if (pos == 0 && to_write >= *size_) { + if (!pos && to_write >= bufsize) { *data_ = write_pos; - write_pos += to_write; - pos = to_write; + *size_ = to_write; + write_pos = NULL; to_write = 0; - break; + return; } - if (to_write) { - - size_t to_copy = std::min (to_write, *size_ - pos); - memcpy (*data_ + pos, write_pos, to_copy); - pos += to_copy; - write_pos += to_copy; - to_write -= to_copy; - } - else { - bool more = (static_cast (this)->*next) (); - if (beginning && offset == -1) { - offset = pos; - beginning = false; - } - if (!more) - break; + // Copy data to the buffer. If the buffer is full, return. + size_t to_copy = std::min (to_write, bufsize - pos); + memcpy (buf + pos, write_pos, to_copy); + pos += to_copy; + write_pos += to_copy; + to_write -= to_copy; + if (pos == bufsize) { + *data_ = buf; + *size_ = pos; + return; } } - - // Return offset of the first message in the buffer. - if (offset_) - *offset_ = offset; - - // Return the size of the filled-in portion of the buffer. - *size_ = pos; } + protected: // Prototype of state machine action. @@ -121,6 +137,9 @@ namespace zmq step_t next; bool beginning; + size_t bufsize; + unsigned char *buf; + encoder_t (const encoder_t&); void operator = (const encoder_t&); }; -- cgit v1.2.3