From d5670f34baa0751a5b4567a28caea4e4fa208727 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 11 Dec 2009 22:29:04 +0100 Subject: ZMQII-26: Use zero-copy for large messages (rx side) --- src/zmq_engine.cpp | 72 +++++++++++++++++++++++------------------------------- 1 file changed, 30 insertions(+), 42 deletions(-) (limited to 'src/zmq_engine.cpp') diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index e8c9889..d474f4e 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -27,21 +27,15 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, const options_t &options_) : io_object_t (parent_), - inbuf (NULL), + inpos (NULL), insize (0), - inpos (0), - outbuf (NULL), + decoder (in_batch_size), + outpos (NULL), outsize (0), - outpos (0), + encoder (out_batch_size), inout (NULL), options (options_) { - // Allocate read & write buffer. - inbuf_storage = (unsigned char*) malloc (in_batch_size); - zmq_assert (inbuf_storage); - outbuf_storage = (unsigned char*) malloc (out_batch_size); - zmq_assert (outbuf_storage); - // Initialise the underlying socket. int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); zmq_assert (rc == 0); @@ -49,8 +43,6 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, zmq::zmq_engine_t::~zmq_engine_t () { - free (outbuf_storage); - free (inbuf_storage); } void zmq::zmq_engine_t::plug (i_inout *inout_) @@ -80,13 +72,12 @@ void zmq::zmq_engine_t::unplug () void zmq::zmq_engine_t::in_event () { - // If there's no data to process in the buffer, read new data. - if (inpos == insize) { + // If there's no data to process in the buffer... + if (!insize) { - // Read as much data as possible to the read buffer. - inbuf = inbuf_storage; - insize = tcp_socket.read (inbuf, in_batch_size); - inpos = 0; + // Retrieve the buffer and read as much data as possible. + decoder.get_buffer (&inpos, &insize); + insize = tcp_socket.read (inpos, insize); // Check whether the peer has closed the connection. if (insize == (size_t) -1) { @@ -96,15 +87,15 @@ void zmq::zmq_engine_t::in_event () } } - // Following code should be executed even if there's not a single byte in - // the buffer. There still can be a decoded messages stored in the decoder. - // Push the data to the decoder. - int nbytes = decoder.write (inbuf + inpos, insize - inpos); + size_t processed = decoder.process_buffer (inpos, insize); + + // Adjust the buffer. + inpos += processed; + insize -= processed; - // Adjust read position. Stop polling for input if we got stuck. - inpos += nbytes; - if (inpos < insize) + // Stop polling for input if we got stuck. + if (processed < insize) reset_pollin (handle); // Flush all messages the decoder may have produced. @@ -114,31 +105,28 @@ void zmq::zmq_engine_t::in_event () void zmq::zmq_engine_t::out_event () { // If write buffer is empty, try to read new data from the encoder. - if (outpos == outsize) { - - outbuf = outbuf_storage; - outsize = out_batch_size; - encoder.read (&outbuf, &outsize); - outpos = 0; - + if (!outsize) { + encoder.get_buffer (&outpos, &outsize); + // If there is no data to send, stop polling for output. - if (outsize == 0) + if (outsize == 0) { reset_pollout (handle); + return; + } } // If there are any data to write in write buffer, write as much as // possible to the socket. - if (outpos < outsize) { - int nbytes = tcp_socket.write (outbuf + outpos, outsize - outpos); - - // Handle problems with the connection. - if (nbytes == -1) { - error (); - return; - } + int nbytes = tcp_socket.write (outpos, outsize); - outpos += nbytes; + // Handle problems with the connection. + if (nbytes == -1) { + error (); + return; } + + outpos += nbytes; + outsize -= nbytes; } void zmq::zmq_engine_t::revive () -- cgit v1.2.3