diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/decoder.hpp | 87 | ||||
-rw-r--r-- | src/encoder.hpp | 107 | ||||
-rw-r--r-- | src/zmq_decoder.cpp | 3 | ||||
-rw-r--r-- | src/zmq_decoder.hpp | 2 | ||||
-rw-r--r-- | src/zmq_encoder.cpp | 3 | ||||
-rw-r--r-- | src/zmq_encoder.hpp | 2 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 72 | ||||
-rw-r--r-- | src/zmq_engine.hpp | 13 |
8 files changed, 174 insertions, 115 deletions
diff --git a/src/decoder.hpp b/src/decoder.hpp index 897f410..5098dd5 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -22,8 +22,11 @@ #include <stddef.h> #include <string.h> +#include <stdlib.h> #include <algorithm> +#include "err.hpp" + namespace zmq { @@ -42,31 +45,80 @@ namespace zmq { public: - inline decoder_t () : - read_ptr (NULL), + inline decoder_t (size_t bufsize_) : + read_pos (NULL), to_read (0), - next (NULL) + next (NULL), + bufsize (bufsize_) + { + buf = (unsigned char*) malloc (bufsize_); + zmq_assert (buf); + } + + inline ~decoder_t () + { + free (buf); + } + + // Returns a buffer to be filled with binary data. + inline void get_buffer (unsigned char **data_, size_t *size_) { + // If we are expected to read large message, we'll opt for zero- + // copy, i.e. we'll ask caller to fill the data directly to the + // message. Note that subsequent read(s) are non-blocking, thus + // each single read reads at most SO_RCVBUF bytes at once not + // depending on how large is the chunk returned from here. + // As a consequence, large messages being received won't block + // other engines running in the same I/O thread for excessive + // amounts of time. + if (to_read >= bufsize) { + *data_ = read_pos; + *size_ = to_read; + return; + } + + *data_ = buf; + *size_ = bufsize; } - // Push the binary data to the decoder. Returns number of bytes - // actually parsed. - inline size_t write (unsigned char *data_, size_t size_) + // Processes the data in the buffer previously allocated using + // get_buffer function. size_ argument specifies nemuber of bytes + // actually filled into the buffer. Function returns number of + // bytes actually processed. + inline size_t process_buffer (unsigned char *data_, size_t size_) { + // In case of zero-copy simply adjust the pointers, no copying + // is required. Also, run the state machine in case all the data + // were processed. + if (data_ == read_pos) { + read_pos += size_; + to_read -= size_; + + while (!to_read) + if (!(static_cast <T*> (this)->*next) ()) + return size_; + return size_; + } + size_t pos = 0; while (true) { - size_t to_copy = std::min (to_read, size_ - pos); - if (read_ptr) { - memcpy (read_ptr, data_ + pos, to_copy); - read_ptr += to_copy; - } - pos += to_copy; - to_read -= to_copy; + + // Try to get more space in the message to fill in. + // If none is available, return. while (!to_read) if (!(static_cast <T*> (this)->*next) ()) return pos; + + // If there are no more data in the buffer, return. if (pos == size_) return pos; + + // Copy the data from buffer to the message. + size_t to_copy = std::min (to_read, size_ - pos); + memcpy (read_pos, data_ + pos, to_copy); + read_pos += to_copy; + pos += to_copy; + to_read -= to_copy; } } @@ -78,20 +130,23 @@ namespace zmq // This function should be called from derived class to read data // from the buffer and schedule next state machine action. - inline void next_step (void *read_ptr_, size_t to_read_, + inline void next_step (void *read_pos_, size_t to_read_, step_t next_) { - read_ptr = (unsigned char*) read_ptr_; + read_pos = (unsigned char*) read_pos_; to_read = to_read_; next = next_; } private: - unsigned char *read_ptr; + unsigned char *read_pos; size_t to_read; step_t next; + size_t bufsize; + unsigned char *buf; + decoder_t (const decoder_t&); void operator = (const decoder_t&); }; diff --git a/src/encoder.hpp b/src/encoder.hpp index c41180f..35c63b0 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -22,8 +22,11 @@ #include <stddef.h> #include <string.h> +#include <stdlib.h> #include <algorithm> +#include "err.hpp" + namespace zmq { @@ -35,68 +38,81 @@ namespace zmq { public: - inline encoder_t () + inline encoder_t (size_t bufsize_) : + bufsize (bufsize_) { + buf = (unsigned char*) malloc (bufsize_); + zmq_assert (buf); } - // The function tries to fill the supplied chunk by binary data. - // If offset is not NULL, it is filled by offset of the first message - // in the batch. If there's no beginning of a message in the batch, - // offset is set to -1. Both data_ and size_ are in/out parameters. - // Upon exit, data_ contains actual position of the data read (may - // be different from the position requested) and size_ contains number - // of bytes actually provided. - inline void read (unsigned char **data_, size_t *size_, + inline ~encoder_t () + { + free (buf); + } + + // The function returns a batch of binary data. If offset is not NULL, + // it is filled by offset of the first message in the batch. If there's + // no beginning of a message in the batch, offset is set to -1. + inline void get_buffer (unsigned char **data_, size_t *size_, int *offset_ = NULL) { - int offset = -1; size_t pos = 0; + if (offset_) + *offset_ = -1; + + while (true) { + + // If there are no more data to return, run the state machine. + // If there are still no data, return what we already have + // in the buffer. + if (!to_write) { + if (!(static_cast <T*> (this)->*next) ()) { + *data_ = buf; + *size_ = pos; + return; + } - while (pos < *size_) { + // If beginning of the message was processed, adjust the + // first-message-offset. + if (beginning) { + if (offset_ && *offset_ == -1) + *offset_ = pos; + beginning = false; + } + } - // If we are able to fill whole buffer in a single go, let's - // use zero-copy. There's no disadvantage to it as we cannot - // stuck multiple messages into the buffer anyway. Note that - // subsequent write(s) are non-blocking, thus each single - // write writes at most SO_SNDBUF bytes at once not depending - // on how large is the chunk returned from here. + // If there are no data in the buffer yet and we are able to + // fill whole buffer in a single go, let's use zero-copy. + // There's no disadvantage to it as we cannot stuck multiple + // messages into the buffer anyway. Note that subsequent + // write(s) are non-blocking, thus each single write writes + // at most SO_SNDBUF bytes at once not depending on how large + // is the chunk returned from here. // As a consequence, large messages being sent won't block // other engines running in the same I/O thread for excessive // amounts of time. - if (pos == 0 && to_write >= *size_) { + if (!pos && to_write >= bufsize) { *data_ = write_pos; - write_pos += to_write; - pos = to_write; + *size_ = to_write; + write_pos = NULL; to_write = 0; - break; + return; } - if (to_write) { - - size_t to_copy = std::min (to_write, *size_ - pos); - memcpy (*data_ + pos, write_pos, to_copy); - pos += to_copy; - write_pos += to_copy; - to_write -= to_copy; - } - else { - bool more = (static_cast <T*> (this)->*next) (); - if (beginning && offset == -1) { - offset = pos; - beginning = false; - } - if (!more) - break; + // Copy data to the buffer. If the buffer is full, return. + size_t to_copy = std::min (to_write, bufsize - pos); + memcpy (buf + pos, write_pos, to_copy); + pos += to_copy; + write_pos += to_copy; + to_write -= to_copy; + if (pos == bufsize) { + *data_ = buf; + *size_ = pos; + return; } } - - // Return offset of the first message in the buffer. - if (offset_) - *offset_ = offset; - - // Return the size of the filled-in portion of the buffer. - *size_ = pos; } + protected: // Prototype of state machine action. @@ -121,6 +137,9 @@ namespace zmq step_t next; bool beginning; + size_t bufsize; + unsigned char *buf; + encoder_t (const encoder_t&); void operator = (const encoder_t&); }; diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index 8040f21..f488272 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -22,7 +22,8 @@ #include "wire.hpp" #include "err.hpp" -zmq::zmq_decoder_t::zmq_decoder_t () : +zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : + decoder_t <zmq_decoder_t> (bufsize_), destination (NULL) { zmq_msg_init (&in_progress); diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index 59c8671..c5433b7 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -32,7 +32,7 @@ namespace zmq { public: - zmq_decoder_t (); + zmq_decoder_t (size_t bufsize_); ~zmq_decoder_t (); void set_inout (struct i_inout *destination_); diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 180bda7..cf129e5 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -21,7 +21,8 @@ #include "i_inout.hpp" #include "wire.hpp" -zmq::zmq_encoder_t::zmq_encoder_t () : +zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : + encoder_t <zmq_encoder_t> (bufsize_), source (NULL) { zmq_msg_init (&in_progress); diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp index 102c434..825e60f 100644 --- a/src/zmq_encoder.hpp +++ b/src/zmq_encoder.hpp @@ -32,7 +32,7 @@ namespace zmq { public: - zmq_encoder_t (); + zmq_encoder_t (size_t bufsize_); ~zmq_encoder_t (); void set_inout (struct i_inout *source_); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index e8c9889..d474f4e 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -27,21 +27,15 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, const options_t &options_) : io_object_t (parent_), - inbuf (NULL), + inpos (NULL), insize (0), - inpos (0), - outbuf (NULL), + decoder (in_batch_size), + outpos (NULL), outsize (0), - outpos (0), + encoder (out_batch_size), inout (NULL), options (options_) { - // Allocate read & write buffer. - inbuf_storage = (unsigned char*) malloc (in_batch_size); - zmq_assert (inbuf_storage); - outbuf_storage = (unsigned char*) malloc (out_batch_size); - zmq_assert (outbuf_storage); - // Initialise the underlying socket. int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); zmq_assert (rc == 0); @@ -49,8 +43,6 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, zmq::zmq_engine_t::~zmq_engine_t () { - free (outbuf_storage); - free (inbuf_storage); } void zmq::zmq_engine_t::plug (i_inout *inout_) @@ -80,13 +72,12 @@ void zmq::zmq_engine_t::unplug () void zmq::zmq_engine_t::in_event () { - // If there's no data to process in the buffer, read new data. - if (inpos == insize) { + // If there's no data to process in the buffer... + if (!insize) { - // Read as much data as possible to the read buffer. - inbuf = inbuf_storage; - insize = tcp_socket.read (inbuf, in_batch_size); - inpos = 0; + // Retrieve the buffer and read as much data as possible. + decoder.get_buffer (&inpos, &insize); + insize = tcp_socket.read (inpos, insize); // Check whether the peer has closed the connection. if (insize == (size_t) -1) { @@ -96,15 +87,15 @@ void zmq::zmq_engine_t::in_event () } } - // Following code should be executed even if there's not a single byte in - // the buffer. There still can be a decoded messages stored in the decoder. - // Push the data to the decoder. - int nbytes = decoder.write (inbuf + inpos, insize - inpos); + size_t processed = decoder.process_buffer (inpos, insize); + + // Adjust the buffer. + inpos += processed; + insize -= processed; - // Adjust read position. Stop polling for input if we got stuck. - inpos += nbytes; - if (inpos < insize) + // Stop polling for input if we got stuck. + if (processed < insize) reset_pollin (handle); // Flush all messages the decoder may have produced. @@ -114,31 +105,28 @@ void zmq::zmq_engine_t::in_event () void zmq::zmq_engine_t::out_event () { // If write buffer is empty, try to read new data from the encoder. - if (outpos == outsize) { - - outbuf = outbuf_storage; - outsize = out_batch_size; - encoder.read (&outbuf, &outsize); - outpos = 0; - + if (!outsize) { + encoder.get_buffer (&outpos, &outsize); + // If there is no data to send, stop polling for output. - if (outsize == 0) + if (outsize == 0) { reset_pollout (handle); + return; + } } // If there are any data to write in write buffer, write as much as // possible to the socket. - if (outpos < outsize) { - int nbytes = tcp_socket.write (outbuf + outpos, outsize - outpos); - - // Handle problems with the connection. - if (nbytes == -1) { - error (); - return; - } + int nbytes = tcp_socket.write (outpos, outsize); - outpos += nbytes; + // Handle problems with the connection. + if (nbytes == -1) { + error (); + return; } + + outpos += nbytes; + outsize -= nbytes; } void zmq::zmq_engine_t::revive () diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index c842da7..8d9e4b9 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -57,21 +57,16 @@ namespace zmq tcp_socket_t tcp_socket; handle_t handle; - unsigned char *inbuf_storage; - unsigned char *inbuf; + unsigned char *inpos; size_t insize; - size_t inpos; + zmq_decoder_t decoder; - unsigned char *outbuf_storage; - unsigned char *outbuf; + unsigned char *outpos; size_t outsize; - size_t outpos; + zmq_encoder_t encoder; i_inout *inout; - zmq_encoder_t encoder; - zmq_decoder_t decoder; - options_t options; zmq_engine_t (const zmq_engine_t&); |