summaryrefslogtreecommitdiff
path: root/src/zmq_decoder.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-15 09:09:19 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-15 09:09:19 +0100
commite49115224a7957b0e5d49326bc02ae6af186eaf9 (patch)
tree81d1ca0ea496004bbc85cec9b3289af96cdaa197 /src/zmq_decoder.cpp
parentbd792faa9d6c78c375dbc52c6d773e157335da36 (diff)
zmq_encoder/decoder are able to add/trim prefixes from messages; fair queueing and load balancing algorithms factorised into separate classes
Diffstat (limited to 'src/zmq_decoder.cpp')
-rw-r--r--src/zmq_decoder.cpp40
1 files changed, 33 insertions, 7 deletions
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index f488272..b9617fc 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -17,23 +17,41 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <stdlib.h>
+#include <string.h>
+
#include "zmq_decoder.hpp"
#include "i_inout.hpp"
#include "wire.hpp"
#include "err.hpp"
-zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
+zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_,
+ void *prefix_, size_t prefix_size_) :
decoder_t <zmq_decoder_t> (bufsize_),
destination (NULL)
{
zmq_msg_init (&in_progress);
+ if (!prefix_) {
+ prefix = NULL;
+ prefix_size = 0;
+ }
+ else {
+ prefix = malloc (prefix_size_);
+ zmq_assert (prefix);
+ memcpy (prefix, prefix_, prefix_size_);
+ prefix_size = prefix_size_;
+ }
+
// At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
}
zmq::zmq_decoder_t::~zmq_decoder_t ()
{
+ if (prefix)
+ free (prefix);
+
zmq_msg_close (&in_progress);
}
@@ -55,11 +73,15 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
+ int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf);
errno_assert (rc == 0);
- next_step (zmq_msg_data (&in_progress), *tmpbuf,
- &zmq_decoder_t::message_ready);
+ // Fill in the message prefix if any.
+ if (prefix)
+ memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
+
+ next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
+ *tmpbuf, &zmq_decoder_t::message_ready);
}
return true;
}
@@ -74,11 +96,15 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, size);
+ int rc = zmq_msg_init_size (&in_progress, prefix_size + size);
errno_assert (rc == 0);
- next_step (zmq_msg_data (&in_progress), size,
- &zmq_decoder_t::message_ready);
+ // Fill in the message prefix if any.
+ if (prefix)
+ memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
+
+ next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size ,
+ size, &zmq_decoder_t::message_ready);
return true;
}