diff options
| -rw-r--r-- | src/downstream.cpp | 1 | ||||
| -rw-r--r-- | src/i_engine.hpp | 8 | ||||
| -rw-r--r-- | src/options.cpp | 1 | ||||
| -rw-r--r-- | src/options.hpp | 3 | ||||
| -rw-r--r-- | src/p2p.cpp | 1 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 9 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 1 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 9 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 1 | ||||
| -rw-r--r-- | src/pub.cpp | 1 | ||||
| -rw-r--r-- | src/rep.cpp | 1 | ||||
| -rw-r--r-- | src/req.cpp | 1 | ||||
| -rw-r--r-- | src/sub.cpp | 1 | ||||
| -rw-r--r-- | src/upstream.cpp | 1 | ||||
| -rw-r--r-- | src/xrep.cpp | 1 | ||||
| -rw-r--r-- | src/xreq.cpp | 1 | ||||
| -rw-r--r-- | src/zmq_decoder.cpp | 27 | ||||
| -rw-r--r-- | src/zmq_decoder.hpp | 6 | ||||
| -rw-r--r-- | src/zmq_encoder.cpp | 13 | ||||
| -rw-r--r-- | src/zmq_encoder.hpp | 8 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 11 | ||||
| -rw-r--r-- | src/zmq_engine.hpp | 1 | ||||
| -rw-r--r-- | src/zmq_init.cpp | 6 | 
23 files changed, 88 insertions, 25 deletions
diff --git a/src/downstream.cpp b/src/downstream.cpp index 2da08e3..29b0689 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -26,6 +26,7 @@  zmq::downstream_t::downstream_t (class app_thread_t *parent_) :      socket_base_t (parent_)  { +    options.type = ZMQ_DOWNSTREAM;      options.requires_in = false;      options.requires_out = true;  } diff --git a/src/i_engine.hpp b/src/i_engine.hpp index d5d7811..bcb4297 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -20,6 +20,8 @@  #ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__  #define __ZMQ_I_ENGINE_HPP_INCLUDED__ +#include <stddef.h> +  namespace zmq  { @@ -36,6 +38,12 @@ namespace zmq          //  This method is called by the session to signalise that there          //  are messages to send available.          virtual void revive () = 0; + +        //  Start tracing the message route. Engine should add the identity +        //  supplied to all inbound messages and trim identity from all the +        //  outbound messages. +        virtual void traceroute (unsigned char *identity_, +            size_t identity_size_) = 0;      };  } diff --git a/src/options.cpp b/src/options.cpp index cc02798..f9d93d6 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,6 +23,7 @@  #include "err.hpp"  zmq::options_t::options_t () : +    type (-1),      hwm (0),      lwm (0),      swap (0), diff --git a/src/options.hpp b/src/options.hpp index b066d48..dbe3701 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -34,6 +34,9 @@ namespace zmq          int setsockopt (int option_, const void *optval_, size_t optvallen_); +        //  Type of the associated socket. One of the constants defined in zmq.h +        int type; +          int64_t hwm;          int64_t lwm;          int64_t swap; diff --git a/src/p2p.cpp b/src/p2p.cpp index 72bc26b..46bbd0b 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -29,6 +29,7 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) :      outpipe (NULL),      alive (true)  { +    options.type = ZMQ_P2P;      options.requires_in = true;      options.requires_out = true;  } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index d0310cc..a2ba9c6 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,6 +88,13 @@ void zmq::pgm_receiver_t::revive ()      zmq_assert (false);  } +void zmq::pgm_receiver_t::traceroute (unsigned char *identity_, +    size_t identity_size_) +{ +    //  No need for tracerouting functionality in PGM socket at the moment. +    zmq_assert (false); +} +  void zmq::pgm_receiver_t::in_event ()  {      // Read data from the underlying pgm_socket. @@ -151,7 +158,7 @@ void zmq::pgm_receiver_t::in_event ()              it->second.joined = true;              //  Create and connect decoder for the peer. -            it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); +            it->second.decoder = new (std::nothrow) zmq_decoder_t (0);              it->second.decoder->set_inout (inout);          } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 3d6a212..f03551f 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,6 +54,7 @@ namespace zmq          void plug (struct i_inout *inout_);          void unplug ();          void revive (); +        void traceroute (unsigned char *identity_, size_t identity_size_);          //  i_poll_events interface implementation.          void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 3f08d8e..fa7d7e0 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -36,7 +36,7 @@  zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,         const options_t &options_) :      io_object_t (parent_), -    encoder (0, false), +    encoder (0),      pgm_socket (false, options_),      options (options_),      out_buffer (NULL), @@ -102,6 +102,13 @@ void zmq::pgm_sender_t::revive ()      out_event ();  } +void zmq::pgm_sender_t::traceroute (unsigned char *identity_, +    size_t identity_size_) +{ +    //  No need for tracerouting functionality in PGM socket at the moment. +    zmq_assert (false); +} +  zmq::pgm_sender_t::~pgm_sender_t ()  {      if (out_buffer) { diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 43adb8b..89357f5 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,6 +52,7 @@ namespace zmq          void plug (struct i_inout *inout_);          void unplug ();          void revive (); +        void traceroute (unsigned char *identity_, size_t identity_size_);          //  i_poll_events interface implementation.          void in_event (); diff --git a/src/pub.cpp b/src/pub.cpp index 05bfdcf..9a2dcc6 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -27,6 +27,7 @@  zmq::pub_t::pub_t (class app_thread_t *parent_) :      socket_base_t (parent_)  { +    options.type = ZMQ_PUB;      options.requires_in = false;      options.requires_out = true;  } diff --git a/src/rep.cpp b/src/rep.cpp index b6bffae..b7685b4 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -30,6 +30,7 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :      waiting_for_reply (false),      reply_pipe (NULL)  { +    options.type = ZMQ_REP;      options.requires_in = true;      options.requires_out = true;  } diff --git a/src/req.cpp b/src/req.cpp index c9240e0..9b1766e 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -30,6 +30,7 @@ zmq::req_t::req_t (class app_thread_t *parent_) :      reply_pipe_active (false),      reply_pipe (NULL)  { +    options.type = ZMQ_REQ;      options.requires_in = true;      options.requires_out = true;  } diff --git a/src/sub.cpp b/src/sub.cpp index 06ed896..31ee222 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -28,6 +28,7 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) :      socket_base_t (parent_),      has_message (false)  { +    options.type = ZMQ_SUB;      options.requires_in = true;      options.requires_out = false;      zmq_msg_init (&message); diff --git a/src/upstream.cpp b/src/upstream.cpp index bdcd5ef..390dcbe 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -25,6 +25,7 @@  zmq::upstream_t::upstream_t (class app_thread_t *parent_) :      socket_base_t (parent_)  { +    options.type = ZMQ_UPSTREAM;      options.requires_in = true;      options.requires_out = false;  } diff --git a/src/xrep.cpp b/src/xrep.cpp index 4cba21c..67a9a39 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -25,6 +25,7 @@  zmq::xrep_t::xrep_t (class app_thread_t *parent_) :      socket_base_t (parent_)  { +    options.type = ZMQ_XREP;      options.requires_in = true;      options.requires_out = true;  } diff --git a/src/xreq.cpp b/src/xreq.cpp index a4310f8..691b66e 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -25,6 +25,7 @@  zmq::xreq_t::xreq_t (class app_thread_t *parent_) :      socket_base_t (parent_)  { +    options.type = ZMQ_REQ;      options.requires_in = true;      options.requires_out = true;  } diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index cc4f846..f502ffd 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -25,24 +25,14 @@  #include "wire.hpp"  #include "err.hpp" -zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_, -      void *prefix_, size_t prefix_size_) : +zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :      decoder_t <zmq_decoder_t> (bufsize_), -    destination (NULL) +    destination (NULL), +    prefix (NULL), +    prefix_size (0)  {      zmq_msg_init (&in_progress); -    if (!prefix_) { -        prefix = NULL; -        prefix_size = 0; -    } -    else { -        prefix = malloc (prefix_size_); -        zmq_assert (prefix); -        memcpy (prefix, prefix_, prefix_size_); -        prefix_size = prefix_size_; -    } -      //  At the beginning, read one byte and go to one_byte_size_ready state.      next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);  } @@ -60,6 +50,15 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)      destination = destination_;  } +void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_, +    size_t prefix_size_) +{ +    prefix = malloc (prefix_size_); +    zmq_assert (prefix); +    memcpy (prefix, prefix_, prefix_size_); +    prefix_size = prefix_size_; +} +  bool zmq::zmq_decoder_t::one_byte_size_ready ()  {      //  First byte of size is read. If it is 0xff read 8-byte size. diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index 5d9133b..dfabece 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -34,11 +34,15 @@ namespace zmq          //  If prefix is not NULL, it will be glued to the beginning of every          //  decoded message. -        zmq_decoder_t (size_t bufsize_, void *prefix_, size_t prefix_size_); +        zmq_decoder_t (size_t bufsize_);          ~zmq_decoder_t ();          void set_inout (struct i_inout *destination_); +        //  Once called, all decoded messages will be prefixed by the specified +        //  prefix. +        void add_prefix (unsigned char *prefix_, size_t prefix_size_); +      private:          bool one_byte_size_ready (); diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 89649ef..a60fe5e 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -21,10 +21,10 @@  #include "i_inout.hpp"  #include "wire.hpp" -zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_, bool trim_prefix_) : +zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) :      encoder_t <zmq_encoder_t> (bufsize_),      source (NULL), -    trim_prefix (trim_prefix_) +    trim (false)  {      zmq_msg_init (&in_progress); @@ -42,10 +42,15 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_)      source = source_;  } +void zmq::zmq_encoder_t::trim_prefix () +{ +    trim = true; +} +  bool zmq::zmq_encoder_t::size_ready ()  {      //  Write message body into the buffer. -    if (!trim_prefix) { +    if (!trim) {          next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),              &zmq_encoder_t::message_ready, false);      } @@ -75,7 +80,7 @@ bool zmq::zmq_encoder_t::message_ready ()      //  Get the message size. If the prefix is not to be sent, adjust the      //  size accordingly.      size_t size = zmq_msg_size (&in_progress); -    if (trim_prefix) +    if (trim)          size -= *(unsigned char*) zmq_msg_data (&in_progress);      //  For messages less than 255 bytes long, write one byte of message size. diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp index 8eedef8..0e23fcd 100644 --- a/src/zmq_encoder.hpp +++ b/src/zmq_encoder.hpp @@ -32,11 +32,15 @@ namespace zmq      {      public: -        zmq_encoder_t (size_t bufsize_, bool trim_prefix_); +        zmq_encoder_t (size_t bufsize_);          ~zmq_encoder_t ();          void set_inout (struct i_inout *source_); +        //  Once called, encoder will start trimming frefixes from outbound +        //  messages. +        void trim_prefix (); +      private:          bool size_ready (); @@ -46,7 +50,7 @@ namespace zmq          ::zmq_msg_t in_progress;          unsigned char tmpbuf [9]; -        bool trim_prefix; +        bool trim;          zmq_encoder_t (const zmq_encoder_t&);          void operator = (const zmq_encoder_t&); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 01fe98c..bda098c 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -32,10 +32,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,      io_object_t (parent_),      inpos (NULL),      insize (0), -    decoder (in_batch_size, NULL, 0), +    decoder (in_batch_size),      outpos (NULL),      outsize (0), -    encoder (out_batch_size, false), +    encoder (out_batch_size),      inout (NULL),      options (options_),      reconnect (reconnect_) @@ -160,6 +160,13 @@ void zmq::zmq_engine_t::revive ()      out_event ();  } +void zmq::zmq_engine_t::traceroute (unsigned char *identity_, +    size_t identity_size_) +{ +    encoder.trim_prefix (); +    decoder.add_prefix (identity_, identity_size_); +} +  void zmq::zmq_engine_t::error ()  {      zmq_assert (inout); diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index ddd0931..174dd1a 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -47,6 +47,7 @@ namespace zmq          void plug (struct i_inout *inout_);          void unplug ();          void revive (); +        void traceroute (unsigned char *identity_, size_t identity_size_);          //  i_poll_events interface implementation.          void in_event (); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 37aafe6..9492caa 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -76,6 +76,12 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)          zmq_msg_size (msg_));      received = true; +    //  Once the initial handshaking is over, XREP sockets should start +    //  tracerouting individual messages. +    if (options.type == ZMQ_XREP) +        engine->traceroute ((unsigned char*) peer_identity.data (), +            peer_identity.size ()); +      return true;  }  | 
