summaryrefslogtreecommitdiff
path: root/src/zmq_engine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq_engine.cpp')
-rw-r--r--src/zmq_engine.cpp72
1 files changed, 30 insertions, 42 deletions
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index e8c9889..d474f4e 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -27,21 +27,15 @@
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
const options_t &options_) :
io_object_t (parent_),
- inbuf (NULL),
+ inpos (NULL),
insize (0),
- inpos (0),
- outbuf (NULL),
+ decoder (in_batch_size),
+ outpos (NULL),
outsize (0),
- outpos (0),
+ encoder (out_batch_size),
inout (NULL),
options (options_)
{
- // Allocate read & write buffer.
- inbuf_storage = (unsigned char*) malloc (in_batch_size);
- zmq_assert (inbuf_storage);
- outbuf_storage = (unsigned char*) malloc (out_batch_size);
- zmq_assert (outbuf_storage);
-
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
zmq_assert (rc == 0);
@@ -49,8 +43,6 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
zmq::zmq_engine_t::~zmq_engine_t ()
{
- free (outbuf_storage);
- free (inbuf_storage);
}
void zmq::zmq_engine_t::plug (i_inout *inout_)
@@ -80,13 +72,12 @@ void zmq::zmq_engine_t::unplug ()
void zmq::zmq_engine_t::in_event ()
{
- // If there's no data to process in the buffer, read new data.
- if (inpos == insize) {
+ // If there's no data to process in the buffer...
+ if (!insize) {
- // Read as much data as possible to the read buffer.
- inbuf = inbuf_storage;
- insize = tcp_socket.read (inbuf, in_batch_size);
- inpos = 0;
+ // Retrieve the buffer and read as much data as possible.
+ decoder.get_buffer (&inpos, &insize);
+ insize = tcp_socket.read (inpos, insize);
// Check whether the peer has closed the connection.
if (insize == (size_t) -1) {
@@ -96,15 +87,15 @@ void zmq::zmq_engine_t::in_event ()
}
}
- // Following code should be executed even if there's not a single byte in
- // the buffer. There still can be a decoded messages stored in the decoder.
-
// Push the data to the decoder.
- int nbytes = decoder.write (inbuf + inpos, insize - inpos);
+ size_t processed = decoder.process_buffer (inpos, insize);
+
+ // Adjust the buffer.
+ inpos += processed;
+ insize -= processed;
- // Adjust read position. Stop polling for input if we got stuck.
- inpos += nbytes;
- if (inpos < insize)
+ // Stop polling for input if we got stuck.
+ if (processed < insize)
reset_pollin (handle);
// Flush all messages the decoder may have produced.
@@ -114,31 +105,28 @@ void zmq::zmq_engine_t::in_event ()
void zmq::zmq_engine_t::out_event ()
{
// If write buffer is empty, try to read new data from the encoder.
- if (outpos == outsize) {
-
- outbuf = outbuf_storage;
- outsize = out_batch_size;
- encoder.read (&outbuf, &outsize);
- outpos = 0;
-
+ if (!outsize) {
+ encoder.get_buffer (&outpos, &outsize);
+
// If there is no data to send, stop polling for output.
- if (outsize == 0)
+ if (outsize == 0) {
reset_pollout (handle);
+ return;
+ }
}
// If there are any data to write in write buffer, write as much as
// possible to the socket.
- if (outpos < outsize) {
- int nbytes = tcp_socket.write (outbuf + outpos, outsize - outpos);
-
- // Handle problems with the connection.
- if (nbytes == -1) {
- error ();
- return;
- }
+ int nbytes = tcp_socket.write (outpos, outsize);
- outpos += nbytes;
+ // Handle problems with the connection.
+ if (nbytes == -1) {
+ error ();
+ return;
}
+
+ outpos += nbytes;
+ outsize -= nbytes;
}
void zmq::zmq_engine_t::revive ()