diff options
| -rw-r--r-- | src/encoder.hpp | 28 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 21 | ||||
| -rw-r--r-- | src/zmq_engine.hpp | 12 | 
3 files changed, 44 insertions, 17 deletions
| diff --git a/src/encoder.hpp b/src/encoder.hpp index 653fbfb..62abb03 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -24,6 +24,8 @@  #include <string.h>  #include <algorithm> +#include <stdio.h> +  namespace zmq  { @@ -44,17 +46,31 @@ namespace zmq          //  NULL, it is filled by offset of the first message in the batch.          //  If there's no beginning of a message in the batch, offset is          //  set to -1. -        inline size_t read (unsigned char *data_, size_t size_, +        inline void read (unsigned char **data_, size_t *size_,              int *offset_ = NULL)          {              int offset = -1;              size_t pos = 0; -            while (pos < size_) { +            while (pos < *size_) { + +                //  If we are able to fill whole buffer in a single go, let's +                //  use zero-copy. There's no disadvantage to it as we cannot +                //  stuck multiple messages into the buffer anyway. +                if (pos == 0 && to_write >= *size_) { +                    *data_ = write_pos; +                    write_pos += *size_; +                    to_write -= *size_; + +                    //  TODO: manage beginning & offset here. + +                    return; +                } +                  if (to_write) { -                    size_t to_copy = std::min (to_write, size_ - pos); -                    memcpy (data_ + pos, write_pos, to_copy); +                    size_t to_copy = std::min (to_write, *size_ - pos); +                    memcpy (*data_ + pos, write_pos, to_copy);                      pos += to_copy;                      write_pos += to_copy;                      to_write -= to_copy; @@ -70,10 +86,12 @@ namespace zmq                  }              } +            //  Return offset of the first message in the buffer.              if (offset_)                  *offset_ = offset; -            return pos; +            //  Return the size of the filled-in portion of the buffer. +            *size_ = pos;          }      protected: diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index baa0eee..e8e689d 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -26,17 +26,19 @@  zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :      io_object_t (parent_), +    inbuf (NULL),      insize (0),      inpos (0), +    outbuf (NULL),      outsize (0),      outpos (0),      inout (NULL)  {      //  Allocate read & write buffer. -    inbuf = (unsigned char*) malloc (in_batch_size); -    zmq_assert (inbuf); -    outbuf = (unsigned char*) malloc (out_batch_size); -    zmq_assert (outbuf); +    inbuf_storage = (unsigned char*) malloc (in_batch_size); +    zmq_assert (inbuf_storage); +    outbuf_storage = (unsigned char*) malloc (out_batch_size); +    zmq_assert (outbuf_storage);      //  Initialise the underlying socket.      int rc = tcp_socket.open (fd_); @@ -45,8 +47,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :  zmq::zmq_engine_t::~zmq_engine_t ()  { -    free (outbuf); -    free (inbuf); +    free (outbuf_storage); +    free (inbuf_storage);  }  void zmq::zmq_engine_t::plug (i_inout *inout_) @@ -80,11 +82,12 @@ void zmq::zmq_engine_t::in_event ()      if (inpos == insize) {          //  Read as much data as possible to the read buffer. +        inbuf = inbuf_storage;          insize = tcp_socket.read (inbuf, in_batch_size);          inpos = 0;          //  Check whether the peer has closed the connection. -        if (insize == -1) { +        if (insize == (size_t) -1) {              insize = 0;              error ();              return; @@ -111,7 +114,9 @@ void zmq::zmq_engine_t::out_event ()      //  If write buffer is empty, try to read new data from the encoder.      if (outpos == outsize) { -        outsize = encoder.read (outbuf, out_batch_size); +        outbuf = outbuf_storage; +        outsize = out_batch_size; +        encoder.read (&outbuf, &outsize);          outpos = 0;          //  If there is no data to send, stop polling for output. diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 0d1b10a..ea77b7e 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -20,6 +20,8 @@  #ifndef __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__  #define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ +#include <stddef.h> +  #include "i_engine.hpp"  #include "io_object.hpp"  #include "tcp_socket.hpp" @@ -54,13 +56,15 @@ namespace zmq          tcp_socket_t tcp_socket;          handle_t handle; +        unsigned char *inbuf_storage;          unsigned char *inbuf; -        int insize; -        int inpos; +        size_t insize; +        size_t inpos; +        unsigned char *outbuf_storage;          unsigned char *outbuf; -        int outsize; -        int outpos; +        size_t outsize; +        size_t outpos;          i_inout *inout; | 
