From e49115224a7957b0e5d49326bc02ae6af186eaf9 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 15 Dec 2009 09:09:19 +0100 Subject: zmq_encoder/decoder are able to add/trim prefixes from messages; fair queueing and load balancing algorithms factorised into separate classes --- src/zmq_encoder.cpp | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) (limited to 'src/zmq_encoder.cpp') diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index cf129e5..4824cd1 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -21,9 +21,10 @@ #include "i_inout.hpp" #include "wire.hpp" -zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : +zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_, bool trim_prefix_) : encoder_t (bufsize_), - source (NULL) + source (NULL), + trim_prefix (trim_prefix_) { zmq_msg_init (&in_progress); @@ -44,8 +45,16 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_) bool zmq::zmq_encoder_t::size_ready () { // Write message body into the buffer. - next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), - &zmq_encoder_t::message_ready, false); + if (!trim_prefix) { + next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), + &zmq_encoder_t::message_ready, false); + } + else { + size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress); + next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size, + zmq_msg_size (&in_progress) - prefix_size, + &zmq_encoder_t::message_ready, false); + } return true; } @@ -63,7 +72,11 @@ bool zmq::zmq_encoder_t::message_ready () return false; } + // Get the message size. If the prefix is not to be sent, adjust the + // size accordingly. size_t size = zmq_msg_size (&in_progress); + if (trim_prefix) + size -= *(unsigned char*) zmq_msg_data (&in_progress); // For messages less than 255 bytes long, write one byte of message size. // For longer messages write 0xff escape character followed by 8-byte -- cgit v1.2.3