summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-08 15:41:50 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-08 15:41:50 +0100
commit9be877c68503c35f9f72c8b92bd11454e4fcad97 (patch)
tree8e5ea05ee40adbe49d7ef9edf1f841681d0e95b8 /src
parentbfef2fcd0ba590169ad46ea45da9d36dca1b5b97 (diff)
ZMQII-26: Use zero-copy for large messages
Diffstat (limited to 'src')
-rw-r--r--src/encoder.hpp28
-rw-r--r--src/zmq_engine.cpp21
-rw-r--r--src/zmq_engine.hpp12
3 files changed, 44 insertions, 17 deletions
diff --git a/src/encoder.hpp b/src/encoder.hpp
index 653fbfb..62abb03 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -24,6 +24,8 @@
#include <string.h>
#include <algorithm>
+#include <stdio.h>
+
namespace zmq
{
@@ -44,17 +46,31 @@ namespace zmq
// NULL, it is filled by offset of the first message in the batch.
// If there's no beginning of a message in the batch, offset is
// set to -1.
- inline size_t read (unsigned char *data_, size_t size_,
+ inline void read (unsigned char **data_, size_t *size_,
int *offset_ = NULL)
{
int offset = -1;
size_t pos = 0;
- while (pos < size_) {
+ while (pos < *size_) {
+
+ // If we are able to fill whole buffer in a single go, let's
+ // use zero-copy. There's no disadvantage to it as we cannot
+ // stuck multiple messages into the buffer anyway.
+ if (pos == 0 && to_write >= *size_) {
+ *data_ = write_pos;
+ write_pos += *size_;
+ to_write -= *size_;
+
+ // TODO: manage beginning & offset here.
+
+ return;
+ }
+
if (to_write) {
- size_t to_copy = std::min (to_write, size_ - pos);
- memcpy (data_ + pos, write_pos, to_copy);
+ size_t to_copy = std::min (to_write, *size_ - pos);
+ memcpy (*data_ + pos, write_pos, to_copy);
pos += to_copy;
write_pos += to_copy;
to_write -= to_copy;
@@ -70,10 +86,12 @@ namespace zmq
}
}
+ // Return offset of the first message in the buffer.
if (offset_)
*offset_ = offset;
- return pos;
+ // Return the size of the filled-in portion of the buffer.
+ *size_ = pos;
}
protected:
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index baa0eee..e8e689d 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -26,17 +26,19 @@
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
io_object_t (parent_),
+ inbuf (NULL),
insize (0),
inpos (0),
+ outbuf (NULL),
outsize (0),
outpos (0),
inout (NULL)
{
// Allocate read & write buffer.
- inbuf = (unsigned char*) malloc (in_batch_size);
- zmq_assert (inbuf);
- outbuf = (unsigned char*) malloc (out_batch_size);
- zmq_assert (outbuf);
+ inbuf_storage = (unsigned char*) malloc (in_batch_size);
+ zmq_assert (inbuf_storage);
+ outbuf_storage = (unsigned char*) malloc (out_batch_size);
+ zmq_assert (outbuf_storage);
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_);
@@ -45,8 +47,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
zmq::zmq_engine_t::~zmq_engine_t ()
{
- free (outbuf);
- free (inbuf);
+ free (outbuf_storage);
+ free (inbuf_storage);
}
void zmq::zmq_engine_t::plug (i_inout *inout_)
@@ -80,11 +82,12 @@ void zmq::zmq_engine_t::in_event ()
if (inpos == insize) {
// Read as much data as possible to the read buffer.
+ inbuf = inbuf_storage;
insize = tcp_socket.read (inbuf, in_batch_size);
inpos = 0;
// Check whether the peer has closed the connection.
- if (insize == -1) {
+ if (insize == (size_t) -1) {
insize = 0;
error ();
return;
@@ -111,7 +114,9 @@ void zmq::zmq_engine_t::out_event ()
// If write buffer is empty, try to read new data from the encoder.
if (outpos == outsize) {
- outsize = encoder.read (outbuf, out_batch_size);
+ outbuf = outbuf_storage;
+ outsize = out_batch_size;
+ encoder.read (&outbuf, &outsize);
outpos = 0;
// If there is no data to send, stop polling for output.
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 0d1b10a..ea77b7e 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__
#define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__
+#include <stddef.h>
+
#include "i_engine.hpp"
#include "io_object.hpp"
#include "tcp_socket.hpp"
@@ -54,13 +56,15 @@ namespace zmq
tcp_socket_t tcp_socket;
handle_t handle;
+ unsigned char *inbuf_storage;
unsigned char *inbuf;
- int insize;
- int inpos;
+ size_t insize;
+ size_t inpos;
+ unsigned char *outbuf_storage;
unsigned char *outbuf;
- int outsize;
- int outpos;
+ size_t outsize;
+ size_t outpos;
i_inout *inout;