summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/decoder.hpp87
-rw-r--r--src/encoder.hpp107
-rw-r--r--src/zmq_decoder.cpp3
-rw-r--r--src/zmq_decoder.hpp2
-rw-r--r--src/zmq_encoder.cpp3
-rw-r--r--src/zmq_encoder.hpp2
-rw-r--r--src/zmq_engine.cpp72
-rw-r--r--src/zmq_engine.hpp13
8 files changed, 174 insertions, 115 deletions
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 897f410..5098dd5 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -22,8 +22,11 @@
#include <stddef.h>
#include <string.h>
+#include <stdlib.h>
#include <algorithm>
+#include "err.hpp"
+
namespace zmq
{
@@ -42,31 +45,80 @@ namespace zmq
{
public:
- inline decoder_t () :
- read_ptr (NULL),
+ inline decoder_t (size_t bufsize_) :
+ read_pos (NULL),
to_read (0),
- next (NULL)
+ next (NULL),
+ bufsize (bufsize_)
+ {
+ buf = (unsigned char*) malloc (bufsize_);
+ zmq_assert (buf);
+ }
+
+ inline ~decoder_t ()
+ {
+ free (buf);
+ }
+
+ // Returns a buffer to be filled with binary data.
+ inline void get_buffer (unsigned char **data_, size_t *size_)
{
+ // If we are expected to read large message, we'll opt for zero-
+ // copy, i.e. we'll ask caller to fill the data directly to the
+ // message. Note that subsequent read(s) are non-blocking, thus
+ // each single read reads at most SO_RCVBUF bytes at once not
+ // depending on how large is the chunk returned from here.
+ // As a consequence, large messages being received won't block
+ // other engines running in the same I/O thread for excessive
+ // amounts of time.
+ if (to_read >= bufsize) {
+ *data_ = read_pos;
+ *size_ = to_read;
+ return;
+ }
+
+ *data_ = buf;
+ *size_ = bufsize;
}
- // Push the binary data to the decoder. Returns number of bytes
- // actually parsed.
- inline size_t write (unsigned char *data_, size_t size_)
+ // Processes the data in the buffer previously allocated using
+ // get_buffer function. size_ argument specifies nemuber of bytes
+ // actually filled into the buffer. Function returns number of
+ // bytes actually processed.
+ inline size_t process_buffer (unsigned char *data_, size_t size_)
{
+ // In case of zero-copy simply adjust the pointers, no copying
+ // is required. Also, run the state machine in case all the data
+ // were processed.
+ if (data_ == read_pos) {
+ read_pos += size_;
+ to_read -= size_;
+
+ while (!to_read)
+ if (!(static_cast <T*> (this)->*next) ())
+ return size_;
+ return size_;
+ }
+
size_t pos = 0;
while (true) {
- size_t to_copy = std::min (to_read, size_ - pos);
- if (read_ptr) {
- memcpy (read_ptr, data_ + pos, to_copy);
- read_ptr += to_copy;
- }
- pos += to_copy;
- to_read -= to_copy;
+
+ // Try to get more space in the message to fill in.
+ // If none is available, return.
while (!to_read)
if (!(static_cast <T*> (this)->*next) ())
return pos;
+
+ // If there are no more data in the buffer, return.
if (pos == size_)
return pos;
+
+ // Copy the data from buffer to the message.
+ size_t to_copy = std::min (to_read, size_ - pos);
+ memcpy (read_pos, data_ + pos, to_copy);
+ read_pos += to_copy;
+ pos += to_copy;
+ to_read -= to_copy;
}
}
@@ -78,20 +130,23 @@ namespace zmq
// This function should be called from derived class to read data
// from the buffer and schedule next state machine action.
- inline void next_step (void *read_ptr_, size_t to_read_,
+ inline void next_step (void *read_pos_, size_t to_read_,
step_t next_)
{
- read_ptr = (unsigned char*) read_ptr_;
+ read_pos = (unsigned char*) read_pos_;
to_read = to_read_;
next = next_;
}
private:
- unsigned char *read_ptr;
+ unsigned char *read_pos;
size_t to_read;
step_t next;
+ size_t bufsize;
+ unsigned char *buf;
+
decoder_t (const decoder_t&);
void operator = (const decoder_t&);
};
diff --git a/src/encoder.hpp b/src/encoder.hpp
index c41180f..35c63b0 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -22,8 +22,11 @@
#include <stddef.h>
#include <string.h>
+#include <stdlib.h>
#include <algorithm>
+#include "err.hpp"
+
namespace zmq
{
@@ -35,68 +38,81 @@ namespace zmq
{
public:
- inline encoder_t ()
+ inline encoder_t (size_t bufsize_) :
+ bufsize (bufsize_)
{
+ buf = (unsigned char*) malloc (bufsize_);
+ zmq_assert (buf);
}
- // The function tries to fill the supplied chunk by binary data.
- // If offset is not NULL, it is filled by offset of the first message
- // in the batch. If there's no beginning of a message in the batch,
- // offset is set to -1. Both data_ and size_ are in/out parameters.
- // Upon exit, data_ contains actual position of the data read (may
- // be different from the position requested) and size_ contains number
- // of bytes actually provided.
- inline void read (unsigned char **data_, size_t *size_,
+ inline ~encoder_t ()
+ {
+ free (buf);
+ }
+
+ // The function returns a batch of binary data. If offset is not NULL,
+ // it is filled by offset of the first message in the batch. If there's
+ // no beginning of a message in the batch, offset is set to -1.
+ inline void get_buffer (unsigned char **data_, size_t *size_,
int *offset_ = NULL)
{
- int offset = -1;
size_t pos = 0;
+ if (offset_)
+ *offset_ = -1;
+
+ while (true) {
+
+ // If there are no more data to return, run the state machine.
+ // If there are still no data, return what we already have
+ // in the buffer.
+ if (!to_write) {
+ if (!(static_cast <T*> (this)->*next) ()) {
+ *data_ = buf;
+ *size_ = pos;
+ return;
+ }
- while (pos < *size_) {
+ // If beginning of the message was processed, adjust the
+ // first-message-offset.
+ if (beginning) {
+ if (offset_ && *offset_ == -1)
+ *offset_ = pos;
+ beginning = false;
+ }
+ }
- // If we are able to fill whole buffer in a single go, let's
- // use zero-copy. There's no disadvantage to it as we cannot
- // stuck multiple messages into the buffer anyway. Note that
- // subsequent write(s) are non-blocking, thus each single
- // write writes at most SO_SNDBUF bytes at once not depending
- // on how large is the chunk returned from here.
+ // If there are no data in the buffer yet and we are able to
+ // fill whole buffer in a single go, let's use zero-copy.
+ // There's no disadvantage to it as we cannot stuck multiple
+ // messages into the buffer anyway. Note that subsequent
+ // write(s) are non-blocking, thus each single write writes
+ // at most SO_SNDBUF bytes at once not depending on how large
+ // is the chunk returned from here.
// As a consequence, large messages being sent won't block
// other engines running in the same I/O thread for excessive
// amounts of time.
- if (pos == 0 && to_write >= *size_) {
+ if (!pos && to_write >= bufsize) {
*data_ = write_pos;
- write_pos += to_write;
- pos = to_write;
+ *size_ = to_write;
+ write_pos = NULL;
to_write = 0;
- break;
+ return;
}
- if (to_write) {
-
- size_t to_copy = std::min (to_write, *size_ - pos);
- memcpy (*data_ + pos, write_pos, to_copy);
- pos += to_copy;
- write_pos += to_copy;
- to_write -= to_copy;
- }
- else {
- bool more = (static_cast <T*> (this)->*next) ();
- if (beginning && offset == -1) {
- offset = pos;
- beginning = false;
- }
- if (!more)
- break;
+ // Copy data to the buffer. If the buffer is full, return.
+ size_t to_copy = std::min (to_write, bufsize - pos);
+ memcpy (buf + pos, write_pos, to_copy);
+ pos += to_copy;
+ write_pos += to_copy;
+ to_write -= to_copy;
+ if (pos == bufsize) {
+ *data_ = buf;
+ *size_ = pos;
+ return;
}
}
-
- // Return offset of the first message in the buffer.
- if (offset_)
- *offset_ = offset;
-
- // Return the size of the filled-in portion of the buffer.
- *size_ = pos;
}
+
protected:
// Prototype of state machine action.
@@ -121,6 +137,9 @@ namespace zmq
step_t next;
bool beginning;
+ size_t bufsize;
+ unsigned char *buf;
+
encoder_t (const encoder_t&);
void operator = (const encoder_t&);
};
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index 8040f21..f488272 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -22,7 +22,8 @@
#include "wire.hpp"
#include "err.hpp"
-zmq::zmq_decoder_t::zmq_decoder_t () :
+zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
+ decoder_t <zmq_decoder_t> (bufsize_),
destination (NULL)
{
zmq_msg_init (&in_progress);
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index 59c8671..c5433b7 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -32,7 +32,7 @@ namespace zmq
{
public:
- zmq_decoder_t ();
+ zmq_decoder_t (size_t bufsize_);
~zmq_decoder_t ();
void set_inout (struct i_inout *destination_);
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 180bda7..cf129e5 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -21,7 +21,8 @@
#include "i_inout.hpp"
#include "wire.hpp"
-zmq::zmq_encoder_t::zmq_encoder_t () :
+zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) :
+ encoder_t <zmq_encoder_t> (bufsize_),
source (NULL)
{
zmq_msg_init (&in_progress);
diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp
index 102c434..825e60f 100644
--- a/src/zmq_encoder.hpp
+++ b/src/zmq_encoder.hpp
@@ -32,7 +32,7 @@ namespace zmq
{
public:
- zmq_encoder_t ();
+ zmq_encoder_t (size_t bufsize_);
~zmq_encoder_t ();
void set_inout (struct i_inout *source_);
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index e8c9889..d474f4e 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -27,21 +27,15 @@
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
const options_t &options_) :
io_object_t (parent_),
- inbuf (NULL),
+ inpos (NULL),
insize (0),
- inpos (0),
- outbuf (NULL),
+ decoder (in_batch_size),
+ outpos (NULL),
outsize (0),
- outpos (0),
+ encoder (out_batch_size),
inout (NULL),
options (options_)
{
- // Allocate read & write buffer.
- inbuf_storage = (unsigned char*) malloc (in_batch_size);
- zmq_assert (inbuf_storage);
- outbuf_storage = (unsigned char*) malloc (out_batch_size);
- zmq_assert (outbuf_storage);
-
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
zmq_assert (rc == 0);
@@ -49,8 +43,6 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
zmq::zmq_engine_t::~zmq_engine_t ()
{
- free (outbuf_storage);
- free (inbuf_storage);
}
void zmq::zmq_engine_t::plug (i_inout *inout_)
@@ -80,13 +72,12 @@ void zmq::zmq_engine_t::unplug ()
void zmq::zmq_engine_t::in_event ()
{
- // If there's no data to process in the buffer, read new data.
- if (inpos == insize) {
+ // If there's no data to process in the buffer...
+ if (!insize) {
- // Read as much data as possible to the read buffer.
- inbuf = inbuf_storage;
- insize = tcp_socket.read (inbuf, in_batch_size);
- inpos = 0;
+ // Retrieve the buffer and read as much data as possible.
+ decoder.get_buffer (&inpos, &insize);
+ insize = tcp_socket.read (inpos, insize);
// Check whether the peer has closed the connection.
if (insize == (size_t) -1) {
@@ -96,15 +87,15 @@ void zmq::zmq_engine_t::in_event ()
}
}
- // Following code should be executed even if there's not a single byte in
- // the buffer. There still can be a decoded messages stored in the decoder.
-
// Push the data to the decoder.
- int nbytes = decoder.write (inbuf + inpos, insize - inpos);
+ size_t processed = decoder.process_buffer (inpos, insize);
+
+ // Adjust the buffer.
+ inpos += processed;
+ insize -= processed;
- // Adjust read position. Stop polling for input if we got stuck.
- inpos += nbytes;
- if (inpos < insize)
+ // Stop polling for input if we got stuck.
+ if (processed < insize)
reset_pollin (handle);
// Flush all messages the decoder may have produced.
@@ -114,31 +105,28 @@ void zmq::zmq_engine_t::in_event ()
void zmq::zmq_engine_t::out_event ()
{
// If write buffer is empty, try to read new data from the encoder.
- if (outpos == outsize) {
-
- outbuf = outbuf_storage;
- outsize = out_batch_size;
- encoder.read (&outbuf, &outsize);
- outpos = 0;
-
+ if (!outsize) {
+ encoder.get_buffer (&outpos, &outsize);
+
// If there is no data to send, stop polling for output.
- if (outsize == 0)
+ if (outsize == 0) {
reset_pollout (handle);
+ return;
+ }
}
// If there are any data to write in write buffer, write as much as
// possible to the socket.
- if (outpos < outsize) {
- int nbytes = tcp_socket.write (outbuf + outpos, outsize - outpos);
-
- // Handle problems with the connection.
- if (nbytes == -1) {
- error ();
- return;
- }
+ int nbytes = tcp_socket.write (outpos, outsize);
- outpos += nbytes;
+ // Handle problems with the connection.
+ if (nbytes == -1) {
+ error ();
+ return;
}
+
+ outpos += nbytes;
+ outsize -= nbytes;
}
void zmq::zmq_engine_t::revive ()
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index c842da7..8d9e4b9 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -57,21 +57,16 @@ namespace zmq
tcp_socket_t tcp_socket;
handle_t handle;
- unsigned char *inbuf_storage;
- unsigned char *inbuf;
+ unsigned char *inpos;
size_t insize;
- size_t inpos;
+ zmq_decoder_t decoder;
- unsigned char *outbuf_storage;
- unsigned char *outbuf;
+ unsigned char *outpos;
size_t outsize;
- size_t outpos;
+ zmq_encoder_t encoder;
i_inout *inout;
- zmq_encoder_t encoder;
- zmq_decoder_t decoder;
-
options_t options;
zmq_engine_t (const zmq_engine_t&);