From 96ccc1c5fceb56bd7ffc2e6bef9ddab5347d722b Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 9 Mar 2010 15:10:44 +0100 Subject: 'flags' fields added to the wire format --- src/zmq_decoder.cpp | 40 +++++++++++++++++++++++++++++----------- src/zmq_decoder.hpp | 1 + src/zmq_encoder.cpp | 11 ++++++++--- src/zmq_encoder.hpp | 2 +- 4 files changed, 39 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index b1776df..34bd618 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -60,25 +60,24 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () else { // TODO: Handle over-sized message decently. + // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... if (prefix.empty ()) { - int rc = zmq_msg_init_size (&in_progress, *tmpbuf); + int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); errno_assert (rc == 0); - next_step (zmq_msg_data (&in_progress), *tmpbuf, - &zmq_decoder_t::message_ready); + } else { int rc = zmq_msg_init_size (&in_progress, - *tmpbuf + 1 + prefix.size ()); + 1 + prefix.size () + *tmpbuf - 1); errno_assert (rc == 0); unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress); *data = (unsigned char) prefix.size (); memcpy (data + 1, prefix.data (), *data); - next_step (data + *data + 1, *tmpbuf, - &zmq_decoder_t::message_ready); } + next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); } return true; } @@ -90,22 +89,41 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () size_t size = (size_t) get_uint64 (tmpbuf); // TODO: Handle over-sized message decently. + // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... if (prefix.empty ()) { - int rc = zmq_msg_init_size (&in_progress, size); + int rc = zmq_msg_init_size (&in_progress, size - 1); errno_assert (rc == 0); - next_step (zmq_msg_data (&in_progress), size, - &zmq_decoder_t::message_ready); } else { - int rc = zmq_msg_init_size (&in_progress, size + 1 + prefix.size ()); + int rc = zmq_msg_init_size (&in_progress, + 1 + prefix.size () + size - 1); errno_assert (rc == 0); unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress); *data = (unsigned char) prefix.size (); memcpy (data + 1, prefix.data (), *data); - next_step (data + *data + 1, size, &zmq_decoder_t::message_ready); + } + next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); + + return true; +} + +bool zmq::zmq_decoder_t::flags_ready () +{ + // No flags are accepted at the moment. + zmq_assert (tmpbuf [0] == 0); + + if (prefix.empty ()) { + next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), + &zmq_decoder_t::message_ready); + } + else { + next_step ((unsigned char*) zmq_msg_data (&in_progress) + + prefix.size () + 1, + zmq_msg_size (&in_progress) - prefix.size () - 1, + &zmq_decoder_t::message_ready); } return true; diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index 11ee6c2..5a9580a 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -48,6 +48,7 @@ namespace zmq bool one_byte_size_ready (); bool eight_byte_size_ready (); + bool flags_ready (); bool message_ready (); struct i_inout *destination; diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 68626fa..95d0e5e 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -89,17 +89,22 @@ bool zmq::zmq_encoder_t::message_ready () size -= prefix_size; } + // Account for the 'flags' byte. + size++; + // For messages less than 255 bytes long, write one byte of message size. // For longer messages write 0xff escape character followed by 8-byte - // message size. + // message size. In both cases empty 'flags' field follows. if (size < 255) { tmpbuf [0] = (unsigned char) size; - next_step (tmpbuf, 1, &zmq_encoder_t::size_ready, true); + tmpbuf [1] = 0; + next_step (tmpbuf, 2, &zmq_encoder_t::size_ready, true); } else { tmpbuf [0] = 0xff; put_uint64 (tmpbuf + 1, size); - next_step (tmpbuf, 9, &zmq_encoder_t::size_ready, true); + tmpbuf [9] = 0; + next_step (tmpbuf, 10, &zmq_encoder_t::size_ready, true); } return true; } diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp index 0e23fcd..953012b 100644 --- a/src/zmq_encoder.hpp +++ b/src/zmq_encoder.hpp @@ -48,7 +48,7 @@ namespace zmq struct i_inout *source; ::zmq_msg_t in_progress; - unsigned char tmpbuf [9]; + unsigned char tmpbuf [10]; bool trim; -- cgit v1.2.3