From 36a576370ccfed3c104850b5b95a6ed3870edbea Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 12 Feb 2010 15:57:54 +0100 Subject: Multi-hop REQ/REP, part I., tracerouting switched on on XREP socket --- src/i_engine.hpp | 8 ++++++++ src/pgm_receiver.cpp | 9 ++++++++- src/pgm_receiver.hpp | 1 + src/pgm_sender.cpp | 9 ++++++++- src/pgm_sender.hpp | 1 + src/zmq_decoder.cpp | 27 +++++++++++++-------------- src/zmq_decoder.hpp | 6 +++++- src/zmq_encoder.cpp | 13 +++++++++---- src/zmq_encoder.hpp | 8 ++++++-- src/zmq_engine.cpp | 11 +++++++++-- src/zmq_engine.hpp | 1 + src/zmq_init.cpp | 6 ++++++ 12 files changed, 75 insertions(+), 25 deletions(-) diff --git a/src/i_engine.hpp b/src/i_engine.hpp index d5d7811..bcb4297 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ #define __ZMQ_I_ENGINE_HPP_INCLUDED__ +#include + namespace zmq { @@ -36,6 +38,12 @@ namespace zmq // This method is called by the session to signalise that there // are messages to send available. virtual void revive () = 0; + + // Start tracing the message route. Engine should add the identity + // supplied to all inbound messages and trim identity from all the + // outbound messages. + virtual void traceroute (unsigned char *identity_, + size_t identity_size_) = 0; }; } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index d0310cc..a2ba9c6 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,6 +88,13 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } +void zmq::pgm_receiver_t::traceroute (unsigned char *identity_, + size_t identity_size_) +{ + // No need for tracerouting functionality in PGM socket at the moment. + zmq_assert (false); +} + void zmq::pgm_receiver_t::in_event () { // Read data from the underlying pgm_socket. @@ -151,7 +158,7 @@ void zmq::pgm_receiver_t::in_event () it->second.joined = true; // Create and connect decoder for the peer. - it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); + it->second.decoder = new (std::nothrow) zmq_decoder_t (0); it->second.decoder->set_inout (inout); } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 3d6a212..f03551f 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,6 +54,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); + void traceroute (unsigned char *identity_, size_t identity_size_); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 3f08d8e..fa7d7e0 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -36,7 +36,7 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, const options_t &options_) : io_object_t (parent_), - encoder (0, false), + encoder (0), pgm_socket (false, options_), options (options_), out_buffer (NULL), @@ -102,6 +102,13 @@ void zmq::pgm_sender_t::revive () out_event (); } +void zmq::pgm_sender_t::traceroute (unsigned char *identity_, + size_t identity_size_) +{ + // No need for tracerouting functionality in PGM socket at the moment. + zmq_assert (false); +} + zmq::pgm_sender_t::~pgm_sender_t () { if (out_buffer) { diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 43adb8b..89357f5 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,6 +52,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); + void traceroute (unsigned char *identity_, size_t identity_size_); // i_poll_events interface implementation. void in_event (); diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index cc4f846..f502ffd 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -25,24 +25,14 @@ #include "wire.hpp" #include "err.hpp" -zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_, - void *prefix_, size_t prefix_size_) : +zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : decoder_t (bufsize_), - destination (NULL) + destination (NULL), + prefix (NULL), + prefix_size (0) { zmq_msg_init (&in_progress); - if (!prefix_) { - prefix = NULL; - prefix_size = 0; - } - else { - prefix = malloc (prefix_size_); - zmq_assert (prefix); - memcpy (prefix, prefix_, prefix_size_); - prefix_size = prefix_size_; - } - // At the beginning, read one byte and go to one_byte_size_ready state. next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); } @@ -60,6 +50,15 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_) destination = destination_; } +void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_, + size_t prefix_size_) +{ + prefix = malloc (prefix_size_); + zmq_assert (prefix); + memcpy (prefix, prefix_, prefix_size_); + prefix_size = prefix_size_; +} + bool zmq::zmq_decoder_t::one_byte_size_ready () { // First byte of size is read. If it is 0xff read 8-byte size. diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index 5d9133b..dfabece 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -34,11 +34,15 @@ namespace zmq // If prefix is not NULL, it will be glued to the beginning of every // decoded message. - zmq_decoder_t (size_t bufsize_, void *prefix_, size_t prefix_size_); + zmq_decoder_t (size_t bufsize_); ~zmq_decoder_t (); void set_inout (struct i_inout *destination_); + // Once called, all decoded messages will be prefixed by the specified + // prefix. + void add_prefix (unsigned char *prefix_, size_t prefix_size_); + private: bool one_byte_size_ready (); diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 89649ef..a60fe5e 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -21,10 +21,10 @@ #include "i_inout.hpp" #include "wire.hpp" -zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_, bool trim_prefix_) : +zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : encoder_t (bufsize_), source (NULL), - trim_prefix (trim_prefix_) + trim (false) { zmq_msg_init (&in_progress); @@ -42,10 +42,15 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_) source = source_; } +void zmq::zmq_encoder_t::trim_prefix () +{ + trim = true; +} + bool zmq::zmq_encoder_t::size_ready () { // Write message body into the buffer. - if (!trim_prefix) { + if (!trim) { next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), &zmq_encoder_t::message_ready, false); } @@ -75,7 +80,7 @@ bool zmq::zmq_encoder_t::message_ready () // Get the message size. If the prefix is not to be sent, adjust the // size accordingly. size_t size = zmq_msg_size (&in_progress); - if (trim_prefix) + if (trim) size -= *(unsigned char*) zmq_msg_data (&in_progress); // For messages less than 255 bytes long, write one byte of message size. diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp index 8eedef8..0e23fcd 100644 --- a/src/zmq_encoder.hpp +++ b/src/zmq_encoder.hpp @@ -32,11 +32,15 @@ namespace zmq { public: - zmq_encoder_t (size_t bufsize_, bool trim_prefix_); + zmq_encoder_t (size_t bufsize_); ~zmq_encoder_t (); void set_inout (struct i_inout *source_); + // Once called, encoder will start trimming frefixes from outbound + // messages. + void trim_prefix (); + private: bool size_ready (); @@ -46,7 +50,7 @@ namespace zmq ::zmq_msg_t in_progress; unsigned char tmpbuf [9]; - bool trim_prefix; + bool trim; zmq_encoder_t (const zmq_encoder_t&); void operator = (const zmq_encoder_t&); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 01fe98c..bda098c 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -32,10 +32,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, io_object_t (parent_), inpos (NULL), insize (0), - decoder (in_batch_size, NULL, 0), + decoder (in_batch_size), outpos (NULL), outsize (0), - encoder (out_batch_size, false), + encoder (out_batch_size), inout (NULL), options (options_), reconnect (reconnect_) @@ -160,6 +160,13 @@ void zmq::zmq_engine_t::revive () out_event (); } +void zmq::zmq_engine_t::traceroute (unsigned char *identity_, + size_t identity_size_) +{ + encoder.trim_prefix (); + decoder.add_prefix (identity_, identity_size_); +} + void zmq::zmq_engine_t::error () { zmq_assert (inout); diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index ddd0931..174dd1a 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -47,6 +47,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); + void traceroute (unsigned char *identity_, size_t identity_size_); // i_poll_events interface implementation. void in_event (); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 37aafe6..9492caa 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -76,6 +76,12 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) zmq_msg_size (msg_)); received = true; + // Once the initial handshaking is over, XREP sockets should start + // tracerouting individual messages. + if (options.type == ZMQ_XREP) + engine->traceroute ((unsigned char*) peer_identity.data (), + peer_identity.size ()); + return true; } -- cgit v1.2.3