summaryrefslogtreecommitdiff
path: root/src/decoder.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-11 22:29:04 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-11 22:29:04 +0100
commitd5670f34baa0751a5b4567a28caea4e4fa208727 (patch)
treeabddcd8965f7a9dedec24ffd3be70eb7a9460fba /src/decoder.hpp
parent770178724f9493e99c832863031aef016f143e9f (diff)
ZMQII-26: Use zero-copy for large messages (rx side)
Diffstat (limited to 'src/decoder.hpp')
-rw-r--r--src/decoder.hpp87
1 files changed, 71 insertions, 16 deletions
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 897f410..5098dd5 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -22,8 +22,11 @@
#include <stddef.h>
#include <string.h>
+#include <stdlib.h>
#include <algorithm>
+#include "err.hpp"
+
namespace zmq
{
@@ -42,31 +45,80 @@ namespace zmq
{
public:
- inline decoder_t () :
- read_ptr (NULL),
+ inline decoder_t (size_t bufsize_) :
+ read_pos (NULL),
to_read (0),
- next (NULL)
+ next (NULL),
+ bufsize (bufsize_)
+ {
+ buf = (unsigned char*) malloc (bufsize_);
+ zmq_assert (buf);
+ }
+
+ inline ~decoder_t ()
+ {
+ free (buf);
+ }
+
+ // Returns a buffer to be filled with binary data.
+ inline void get_buffer (unsigned char **data_, size_t *size_)
{
+ // If we are expected to read large message, we'll opt for zero-
+ // copy, i.e. we'll ask caller to fill the data directly to the
+ // message. Note that subsequent read(s) are non-blocking, thus
+ // each single read reads at most SO_RCVBUF bytes at once not
+ // depending on how large is the chunk returned from here.
+ // As a consequence, large messages being received won't block
+ // other engines running in the same I/O thread for excessive
+ // amounts of time.
+ if (to_read >= bufsize) {
+ *data_ = read_pos;
+ *size_ = to_read;
+ return;
+ }
+
+ *data_ = buf;
+ *size_ = bufsize;
}
- // Push the binary data to the decoder. Returns number of bytes
- // actually parsed.
- inline size_t write (unsigned char *data_, size_t size_)
+ // Processes the data in the buffer previously allocated using
+ // get_buffer function. size_ argument specifies nemuber of bytes
+ // actually filled into the buffer. Function returns number of
+ // bytes actually processed.
+ inline size_t process_buffer (unsigned char *data_, size_t size_)
{
+ // In case of zero-copy simply adjust the pointers, no copying
+ // is required. Also, run the state machine in case all the data
+ // were processed.
+ if (data_ == read_pos) {
+ read_pos += size_;
+ to_read -= size_;
+
+ while (!to_read)
+ if (!(static_cast <T*> (this)->*next) ())
+ return size_;
+ return size_;
+ }
+
size_t pos = 0;
while (true) {
- size_t to_copy = std::min (to_read, size_ - pos);
- if (read_ptr) {
- memcpy (read_ptr, data_ + pos, to_copy);
- read_ptr += to_copy;
- }
- pos += to_copy;
- to_read -= to_copy;
+
+ // Try to get more space in the message to fill in.
+ // If none is available, return.
while (!to_read)
if (!(static_cast <T*> (this)->*next) ())
return pos;
+
+ // If there are no more data in the buffer, return.
if (pos == size_)
return pos;
+
+ // Copy the data from buffer to the message.
+ size_t to_copy = std::min (to_read, size_ - pos);
+ memcpy (read_pos, data_ + pos, to_copy);
+ read_pos += to_copy;
+ pos += to_copy;
+ to_read -= to_copy;
}
}
@@ -78,20 +130,23 @@ namespace zmq
// This function should be called from derived class to read data
// from the buffer and schedule next state machine action.
- inline void next_step (void *read_ptr_, size_t to_read_,
+ inline void next_step (void *read_pos_, size_t to_read_,
step_t next_)
{
- read_ptr = (unsigned char*) read_ptr_;
+ read_pos = (unsigned char*) read_pos_;
to_read = to_read_;
next = next_;
}
private:
- unsigned char *read_ptr;
+ unsigned char *read_pos;
size_t to_read;
step_t next;
+ size_t bufsize;
+ unsigned char *buf;
+
decoder_t (const decoder_t&);
void operator = (const decoder_t&);
};