From d5670f34baa0751a5b4567a28caea4e4fa208727 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 11 Dec 2009 22:29:04 +0100 Subject: ZMQII-26: Use zero-copy for large messages (rx side) --- src/decoder.hpp | 87 ++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 71 insertions(+), 16 deletions(-) (limited to 'src/decoder.hpp') diff --git a/src/decoder.hpp b/src/decoder.hpp index 897f410..5098dd5 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -22,8 +22,11 @@ #include #include +#include #include +#include "err.hpp" + namespace zmq { @@ -42,31 +45,80 @@ namespace zmq { public: - inline decoder_t () : - read_ptr (NULL), + inline decoder_t (size_t bufsize_) : + read_pos (NULL), to_read (0), - next (NULL) + next (NULL), + bufsize (bufsize_) + { + buf = (unsigned char*) malloc (bufsize_); + zmq_assert (buf); + } + + inline ~decoder_t () + { + free (buf); + } + + // Returns a buffer to be filled with binary data. + inline void get_buffer (unsigned char **data_, size_t *size_) { + // If we are expected to read large message, we'll opt for zero- + // copy, i.e. we'll ask caller to fill the data directly to the + // message. Note that subsequent read(s) are non-blocking, thus + // each single read reads at most SO_RCVBUF bytes at once not + // depending on how large is the chunk returned from here. + // As a consequence, large messages being received won't block + // other engines running in the same I/O thread for excessive + // amounts of time. + if (to_read >= bufsize) { + *data_ = read_pos; + *size_ = to_read; + return; + } + + *data_ = buf; + *size_ = bufsize; } - // Push the binary data to the decoder. Returns number of bytes - // actually parsed. - inline size_t write (unsigned char *data_, size_t size_) + // Processes the data in the buffer previously allocated using + // get_buffer function. size_ argument specifies nemuber of bytes + // actually filled into the buffer. Function returns number of + // bytes actually processed. + inline size_t process_buffer (unsigned char *data_, size_t size_) { + // In case of zero-copy simply adjust the pointers, no copying + // is required. Also, run the state machine in case all the data + // were processed. + if (data_ == read_pos) { + read_pos += size_; + to_read -= size_; + + while (!to_read) + if (!(static_cast (this)->*next) ()) + return size_; + return size_; + } + size_t pos = 0; while (true) { - size_t to_copy = std::min (to_read, size_ - pos); - if (read_ptr) { - memcpy (read_ptr, data_ + pos, to_copy); - read_ptr += to_copy; - } - pos += to_copy; - to_read -= to_copy; + + // Try to get more space in the message to fill in. + // If none is available, return. while (!to_read) if (!(static_cast (this)->*next) ()) return pos; + + // If there are no more data in the buffer, return. if (pos == size_) return pos; + + // Copy the data from buffer to the message. + size_t to_copy = std::min (to_read, size_ - pos); + memcpy (read_pos, data_ + pos, to_copy); + read_pos += to_copy; + pos += to_copy; + to_read -= to_copy; } } @@ -78,20 +130,23 @@ namespace zmq // This function should be called from derived class to read data // from the buffer and schedule next state machine action. - inline void next_step (void *read_ptr_, size_t to_read_, + inline void next_step (void *read_pos_, size_t to_read_, step_t next_) { - read_ptr = (unsigned char*) read_ptr_; + read_pos = (unsigned char*) read_pos_; to_read = to_read_; next = next_; } private: - unsigned char *read_ptr; + unsigned char *read_pos; size_t to_read; step_t next; + size_t bufsize; + unsigned char *buf; + decoder_t (const decoder_t&); void operator = (const decoder_t&); }; -- cgit v1.2.3