From 9be877c68503c35f9f72c8b92bd11454e4fcad97 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 8 Dec 2009 15:41:50 +0100 Subject: ZMQII-26: Use zero-copy for large messages --- src/encoder.hpp | 28 +++++++++++++++++++++++----- src/zmq_engine.cpp | 21 +++++++++++++-------- src/zmq_engine.hpp | 12 ++++++++---- 3 files changed, 44 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/encoder.hpp b/src/encoder.hpp index 653fbfb..62abb03 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -24,6 +24,8 @@ #include #include +#include + namespace zmq { @@ -44,17 +46,31 @@ namespace zmq // NULL, it is filled by offset of the first message in the batch. // If there's no beginning of a message in the batch, offset is // set to -1. - inline size_t read (unsigned char *data_, size_t size_, + inline void read (unsigned char **data_, size_t *size_, int *offset_ = NULL) { int offset = -1; size_t pos = 0; - while (pos < size_) { + while (pos < *size_) { + + // If we are able to fill whole buffer in a single go, let's + // use zero-copy. There's no disadvantage to it as we cannot + // stuck multiple messages into the buffer anyway. + if (pos == 0 && to_write >= *size_) { + *data_ = write_pos; + write_pos += *size_; + to_write -= *size_; + + // TODO: manage beginning & offset here. + + return; + } + if (to_write) { - size_t to_copy = std::min (to_write, size_ - pos); - memcpy (data_ + pos, write_pos, to_copy); + size_t to_copy = std::min (to_write, *size_ - pos); + memcpy (*data_ + pos, write_pos, to_copy); pos += to_copy; write_pos += to_copy; to_write -= to_copy; @@ -70,10 +86,12 @@ namespace zmq } } + // Return offset of the first message in the buffer. if (offset_) *offset_ = offset; - return pos; + // Return the size of the filled-in portion of the buffer. + *size_ = pos; } protected: diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index baa0eee..e8e689d 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -26,17 +26,19 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : io_object_t (parent_), + inbuf (NULL), insize (0), inpos (0), + outbuf (NULL), outsize (0), outpos (0), inout (NULL) { // Allocate read & write buffer. - inbuf = (unsigned char*) malloc (in_batch_size); - zmq_assert (inbuf); - outbuf = (unsigned char*) malloc (out_batch_size); - zmq_assert (outbuf); + inbuf_storage = (unsigned char*) malloc (in_batch_size); + zmq_assert (inbuf_storage); + outbuf_storage = (unsigned char*) malloc (out_batch_size); + zmq_assert (outbuf_storage); // Initialise the underlying socket. int rc = tcp_socket.open (fd_); @@ -45,8 +47,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : zmq::zmq_engine_t::~zmq_engine_t () { - free (outbuf); - free (inbuf); + free (outbuf_storage); + free (inbuf_storage); } void zmq::zmq_engine_t::plug (i_inout *inout_) @@ -80,11 +82,12 @@ void zmq::zmq_engine_t::in_event () if (inpos == insize) { // Read as much data as possible to the read buffer. + inbuf = inbuf_storage; insize = tcp_socket.read (inbuf, in_batch_size); inpos = 0; // Check whether the peer has closed the connection. - if (insize == -1) { + if (insize == (size_t) -1) { insize = 0; error (); return; @@ -111,7 +114,9 @@ void zmq::zmq_engine_t::out_event () // If write buffer is empty, try to read new data from the encoder. if (outpos == outsize) { - outsize = encoder.read (outbuf, out_batch_size); + outbuf = outbuf_storage; + outsize = out_batch_size; + encoder.read (&outbuf, &outsize); outpos = 0; // If there is no data to send, stop polling for output. diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 0d1b10a..ea77b7e 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ #define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ +#include + #include "i_engine.hpp" #include "io_object.hpp" #include "tcp_socket.hpp" @@ -54,13 +56,15 @@ namespace zmq tcp_socket_t tcp_socket; handle_t handle; + unsigned char *inbuf_storage; unsigned char *inbuf; - int insize; - int inpos; + size_t insize; + size_t inpos; + unsigned char *outbuf_storage; unsigned char *outbuf; - int outsize; - int outpos; + size_t outsize; + size_t outpos; i_inout *inout; -- cgit v1.2.3