From e49115224a7957b0e5d49326bc02ae6af186eaf9 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 15 Dec 2009 09:09:19 +0100 Subject: zmq_encoder/decoder are able to add/trim prefixes from messages; fair queueing and load balancing algorithms factorised into separate classes --- src/zmq_decoder.cpp | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) (limited to 'src/zmq_decoder.cpp') diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index f488272..b9617fc 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -17,23 +17,41 @@ along with this program. If not, see . */ +#include +#include + #include "zmq_decoder.hpp" #include "i_inout.hpp" #include "wire.hpp" #include "err.hpp" -zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : +zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_, + void *prefix_, size_t prefix_size_) : decoder_t (bufsize_), destination (NULL) { zmq_msg_init (&in_progress); + if (!prefix_) { + prefix = NULL; + prefix_size = 0; + } + else { + prefix = malloc (prefix_size_); + zmq_assert (prefix); + memcpy (prefix, prefix_, prefix_size_); + prefix_size = prefix_size_; + } + // At the beginning, read one byte and go to one_byte_size_ready state. next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); } zmq::zmq_decoder_t::~zmq_decoder_t () { + if (prefix) + free (prefix); + zmq_msg_close (&in_progress); } @@ -55,11 +73,15 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, *tmpbuf); + int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf); errno_assert (rc == 0); - next_step (zmq_msg_data (&in_progress), *tmpbuf, - &zmq_decoder_t::message_ready); + // Fill in the message prefix if any. + if (prefix) + memcpy (zmq_msg_data (&in_progress), prefix, prefix_size); + + next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size, + *tmpbuf, &zmq_decoder_t::message_ready); } return true; } @@ -74,11 +96,15 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, size); + int rc = zmq_msg_init_size (&in_progress, prefix_size + size); errno_assert (rc == 0); - next_step (zmq_msg_data (&in_progress), size, - &zmq_decoder_t::message_ready); + // Fill in the message prefix if any. + if (prefix) + memcpy (zmq_msg_data (&in_progress), prefix, prefix_size); + + next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size , + size, &zmq_decoder_t::message_ready); return true; } -- cgit v1.2.3