diff options
| -rw-r--r-- | src/downstream.cpp | 2 | ||||
| -rw-r--r-- | src/downstream.hpp | 3 | ||||
| -rw-r--r-- | src/i_endpoint.hpp | 4 | ||||
| -rw-r--r-- | src/i_engine.hpp | 9 | ||||
| -rw-r--r-- | src/p2p.cpp | 2 | ||||
| -rw-r--r-- | src/p2p.hpp | 3 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 8 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 3 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 8 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 3 | ||||
| -rw-r--r-- | src/pub.cpp | 2 | ||||
| -rw-r--r-- | src/pub.hpp | 3 | ||||
| -rw-r--r-- | src/rep.cpp | 2 | ||||
| -rw-r--r-- | src/rep.hpp | 3 | ||||
| -rw-r--r-- | src/req.cpp | 2 | ||||
| -rw-r--r-- | src/req.hpp | 3 | ||||
| -rw-r--r-- | src/session.cpp | 7 | ||||
| -rw-r--r-- | src/session.hpp | 3 | ||||
| -rw-r--r-- | src/socket_base.cpp | 12 | ||||
| -rw-r--r-- | src/socket_base.hpp | 5 | ||||
| -rw-r--r-- | src/sub.cpp | 2 | ||||
| -rw-r--r-- | src/sub.hpp | 3 | ||||
| -rw-r--r-- | src/upstream.cpp | 2 | ||||
| -rw-r--r-- | src/upstream.hpp | 3 | ||||
| -rw-r--r-- | src/xrep.cpp | 51 | ||||
| -rw-r--r-- | src/xrep.hpp | 10 | ||||
| -rw-r--r-- | src/xreq.cpp | 2 | ||||
| -rw-r--r-- | src/xreq.hpp | 3 | ||||
| -rw-r--r-- | src/zmq_decoder.cpp | 48 | ||||
| -rw-r--r-- | src/zmq_encoder.cpp | 15 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 8 | ||||
| -rw-r--r-- | src/zmq_engine.hpp | 3 | ||||
| -rw-r--r-- | src/zmq_init.cpp | 6 | 
33 files changed, 171 insertions, 72 deletions
| diff --git a/src/downstream.cpp b/src/downstream.cpp index 2da08e3..3431264 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -35,7 +35,7 @@ zmq::downstream_t::~downstream_t ()  }  void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (!inpipe_ && outpipe_);      lb.attach (outpipe_); diff --git a/src/downstream.hpp b/src/downstream.hpp index 35dec95..dbd79a5 100644 --- a/src/downstream.hpp +++ b/src/downstream.hpp @@ -34,7 +34,8 @@ namespace zmq          ~downstream_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index d60b39e..ddab6a4 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -20,6 +20,8 @@  #ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__  #define __ZMQ_I_ENDPOINT_HPP_INCLUDED__ +#include "blob.hpp" +  namespace zmq  { @@ -28,7 +30,7 @@ namespace zmq          virtual ~i_endpoint () {}          virtual void attach_pipes (class reader_t *inpipe_, -            class writer_t *outpipe_) = 0; +            class writer_t *outpipe_, const blob_t &peer_identity_) = 0;          virtual void detach_inpipe (class reader_t *pipe_) = 0;          virtual void detach_outpipe (class writer_t *pipe_) = 0;          virtual void kill (class reader_t *pipe_) = 0; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index d64027d..81b56df 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -41,10 +41,11 @@ namespace zmq          //  are messages to send available.          virtual void revive () = 0; -        //  Start tracing the message route. Engine should add the identity -        //  supplied to all inbound messages and trim identity from all the -        //  outbound messages. -        virtual void traceroute (const blob_t &identity_) = 0; +        //  Engine should add the prefix supplied to all inbound messages. +        virtual void add_prefix (const blob_t &identity_) = 0; + +        //  Engine should trim prefix from all the outbound messages. +        virtual void trim_prefix () = 0;      };  } diff --git a/src/p2p.cpp b/src/p2p.cpp index 72bc26b..ca7a8f5 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -42,7 +42,7 @@ zmq::p2p_t::~p2p_t ()  }  void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (!inpipe && !outpipe);      inpipe = inpipe_; diff --git a/src/p2p.hpp b/src/p2p.hpp index 2ff1047..bca0eab 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -33,7 +33,8 @@ namespace zmq          ~p2p_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index b7ca327..e708229 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,7 +88,13 @@ void zmq::pgm_receiver_t::revive ()      zmq_assert (false);  } -void zmq::pgm_receiver_t::traceroute (const blob_t &identity_) +void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) +{ +    //  No need for tracerouting functionality in PGM socket at the moment. +    zmq_assert (false); +} + +void zmq::pgm_receiver_t::trim_prefix ()  {      //  No need for tracerouting functionality in PGM socket at the moment.      zmq_assert (false); diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index b01e5b0..3f0ef81 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,7 +54,8 @@ namespace zmq          void plug (struct i_inout *inout_);          void unplug ();          void revive (); -        void traceroute (const blob_t &identity_); +        void add_prefix (const blob_t &identity_); +        void trim_prefix ();          //  i_poll_events interface implementation.          void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index acbc3fb..27b4d0c 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -102,7 +102,13 @@ void zmq::pgm_sender_t::revive ()      out_event ();  } -void zmq::pgm_sender_t::traceroute (const blob_t &identity_) +void zmq::pgm_sender_t::add_prefix (const blob_t &identity_) +{ +    //  No need for tracerouting functionality in PGM socket at the moment. +    zmq_assert (false); +} + +void zmq::pgm_sender_t::trim_prefix ()  {      //  No need for tracerouting functionality in PGM socket at the moment.      zmq_assert (false); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 7041610..951c417 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,7 +52,8 @@ namespace zmq          void plug (struct i_inout *inout_);          void unplug ();          void revive (); -        void traceroute (const blob_t &identity_); +        void add_prefix (const blob_t &identity_); +        void trim_prefix ();          //  i_poll_events interface implementation.          void in_event (); diff --git a/src/pub.cpp b/src/pub.cpp index 05bfdcf..5b9d48c 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -39,7 +39,7 @@ zmq::pub_t::~pub_t ()  }  void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (!inpipe_);      out_pipes.push_back (outpipe_); diff --git a/src/pub.hpp b/src/pub.hpp index 5b2f348..26142a4 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -34,7 +34,8 @@ namespace zmq          ~pub_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); diff --git a/src/rep.cpp b/src/rep.cpp index 8c5b86c..968427d 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -44,7 +44,7 @@ zmq::rep_t::~rep_t ()  }  void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (inpipe_ && outpipe_);      zmq_assert (in_pipes.size () == out_pipes.size ()); diff --git a/src/rep.hpp b/src/rep.hpp index 7170da7..7ead321 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -34,7 +34,8 @@ namespace zmq          ~rep_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); diff --git a/src/req.cpp b/src/req.cpp index c9240e0..735f0aa 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -39,7 +39,7 @@ zmq::req_t::~req_t ()  }  void zmq::req_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (inpipe_ && outpipe_);      zmq_assert (in_pipes.size () == out_pipes.size ()); diff --git a/src/req.hpp b/src/req.hpp index 60ee5e7..da8e61a 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -34,7 +34,8 @@ namespace zmq          ~req_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); diff --git a/src/session.cpp b/src/session.cpp index f86327e..74bd8ae 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -124,7 +124,7 @@ uint64_t zmq::session_t::get_ordinal ()  }  void zmq::session_t::attach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      if (inpipe_) {          zmq_assert (!in_pipe); @@ -251,4 +251,9 @@ void zmq::session_t::process_attach (i_engine *engine_,      zmq_assert (engine_);      engine = engine_;      engine->plug (this); + +    //  Once the initial handshaking is over tracerouting should trim prefixes +    //  from outbound messages. +    if (options.traceroute) +        engine->trim_prefix ();  } diff --git a/src/session.hpp b/src/session.hpp index d412728..872748c 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -51,7 +51,8 @@ namespace zmq          uint64_t get_ordinal ();          //  i_endpoint interface implementation. -        void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void detach_inpipe (class reader_t *pipe_);          void detach_outpipe (class writer_t *pipe_);          void kill (class reader_t *pipe_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 222b769..1607673 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -169,7 +169,7 @@ int zmq::socket_base_t::connect (const char *addr_)          //  Attach the pipes to this socket object.          attach_pipes (in_pipe ? &in_pipe->reader : NULL, -            out_pipe ? &out_pipe->writer : NULL); +            out_pipe ? &out_pipe->writer : NULL, blob_t ());          //  Attach the pipes to the peer socket. Note that peer's seqnum          //  was incremented in find_endpoint function. The callee is notified @@ -211,11 +211,11 @@ int zmq::socket_base_t::connect (const char *addr_)          //  Attach the pipes to the socket object.          attach_pipes (in_pipe ? &in_pipe->reader : NULL, -            out_pipe ? &out_pipe->writer : NULL); +            out_pipe ? &out_pipe->writer : NULL, blob_t ());          //  Attach the pipes to the session object.          session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, -            in_pipe ? &in_pipe->writer : NULL); +            in_pipe ? &in_pipe->writer : NULL, blob_t ());      }      //  Activate the session. @@ -553,13 +553,13 @@ void zmq::socket_base_t::revive (reader_t *pipe_)  }  void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      if (inpipe_)          inpipe_->set_endpoint (this);      if (outpipe_)          outpipe_->set_endpoint (this); -    xattach_pipes (inpipe_, outpipe_); +    xattach_pipes (inpipe_, outpipe_, peer_identity_);  }  void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) @@ -582,7 +582,7 @@ void zmq::socket_base_t::process_own (owned_t *object_)  void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,      const blob_t &peer_identity_)  { -    attach_pipes (in_pipe_, out_pipe_); +    attach_pipes (in_pipe_, out_pipe_, peer_identity_);  }  void zmq::socket_base_t::process_term_req (owned_t *object_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index a1702a7..5327acc 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -87,7 +87,8 @@ namespace zmq          class session_t *find_session (uint64_t ordinal_);          //  i_endpoint interface implementation. -        void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void detach_inpipe (class reader_t *pipe_);          void detach_outpipe (class writer_t *pipe_);          void kill (class reader_t *pipe_); @@ -100,7 +101,7 @@ namespace zmq          //  Pipe management is done by individual socket types.          virtual void xattach_pipes (class reader_t *inpipe_, -            class writer_t *outpipe_) = 0; +            class writer_t *outpipe_, const blob_t &peer_identity_) = 0;          virtual void xdetach_inpipe (class reader_t *pipe_) = 0;          virtual void xdetach_outpipe (class writer_t *pipe_) = 0;          virtual void xkill (class reader_t *pipe_) = 0; diff --git a/src/sub.cpp b/src/sub.cpp index 06ed896..29ac951 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -39,7 +39,7 @@ zmq::sub_t::~sub_t ()  }  void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (inpipe_ && !outpipe_);      fq.attach (inpipe_); diff --git a/src/sub.hpp b/src/sub.hpp index 9e7d6cc..8234b77 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -39,7 +39,8 @@ namespace zmq      protected:          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); diff --git a/src/upstream.cpp b/src/upstream.cpp index bdcd5ef..d7238b9 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -34,7 +34,7 @@ zmq::upstream_t::~upstream_t ()  }  void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (inpipe_ && !outpipe_);      fq.attach (inpipe_); diff --git a/src/upstream.hpp b/src/upstream.hpp index 1e6914b..d1ee7b1 100644 --- a/src/upstream.hpp +++ b/src/upstream.hpp @@ -34,7 +34,8 @@ namespace zmq          ~upstream_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); diff --git a/src/xrep.cpp b/src/xrep.cpp index 328a832..6fa6bfa 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -21,6 +21,7 @@  #include "xrep.hpp"  #include "err.hpp" +#include "pipe.hpp"  zmq::xrep_t::xrep_t (class app_thread_t *parent_) :      socket_base_t (parent_) @@ -42,12 +43,15 @@ zmq::xrep_t::~xrep_t ()  }  void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (inpipe_ && outpipe_);      fq.attach (inpipe_); -    zmq_assert (false); +    //  TODO: What if new connection has same peer identity as the old one? +    bool ok = outpipes.insert (std::make_pair ( +        peer_identity_, outpipe_)).second; +    zmq_assert (ok);  }  void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) @@ -58,6 +62,12 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)  void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)  { +    for (outpipes_t::iterator it = outpipes.begin (); +          it != outpipes.end (); ++it) +        if (it->second == pipe_) { +            outpipes.erase (it); +            return; +        }      zmq_assert (false);  } @@ -80,8 +90,35 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,  int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)  { -    zmq_assert (false); -    return -1; +    unsigned char *data = (unsigned char*) zmq_msg_data (msg_); +    size_t size = zmq_msg_size (msg_); + +    //  Check whether the message is well-formed. +    zmq_assert (size >= 1); +    zmq_assert (size_t (*data + 1) <= size); + +    //  Find the corresponding outbound pipe. If there's none, just drop the +    //  message. +    //  TODO: There's an allocation here! It's the critical path! Get rid of it! +    blob_t identity (data + 1, *data); +    outpipes_t::iterator it = outpipes.find (identity); +    if (it == outpipes.end ()) { +        int rc = zmq_msg_close (msg_); +        zmq_assert (rc == 0); +        rc = zmq_msg_init (msg_); +        zmq_assert (rc == 0); +        return 0; +    } + +    //  Push message to the selected pipe. +    it->second->write (msg_); +    it->second->flush (); + +    //  Detach the message from the data buffer. +    int rc = zmq_msg_init (msg_); +    zmq_assert (rc == 0); + +    return 0;  }  int zmq::xrep_t::xflush () @@ -102,8 +139,10 @@ bool zmq::xrep_t::xhas_in ()  bool zmq::xrep_t::xhas_out ()  { -    zmq_assert (false); -    return false; +    //  In theory, XREP socket is always ready for writing. Whether actual +    //  attempt to write succeeds depends on whitch pipe the message is going +    //  to be routed to. +    return true;  } diff --git a/src/xrep.hpp b/src/xrep.hpp index 67ab02d..4534463 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -20,7 +20,10 @@  #ifndef __ZMQ_XREP_HPP_INCLUDED__  #define __ZMQ_XREP_HPP_INCLUDED__ +#include <map> +  #include "socket_base.hpp" +#include "blob.hpp"  #include "fq.hpp"  namespace zmq @@ -34,7 +37,8 @@ namespace zmq          ~xrep_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); @@ -51,6 +55,10 @@ namespace zmq          //  Inbound messages are fair-queued.          fq_t fq; +        //  Outbound pipes indexed by the peer names. +        typedef std::map <blob_t, class writer_t*> outpipes_t; +        outpipes_t outpipes; +          xrep_t (const xrep_t&);          void operator = (const xrep_t&);      }; diff --git a/src/xreq.cpp b/src/xreq.cpp index a4310f8..dda924c 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -34,7 +34,7 @@ zmq::xreq_t::~xreq_t ()  }  void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (inpipe_ && outpipe_);      fq.attach (inpipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index d0cbb4f..e23e832 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -35,7 +35,8 @@ namespace zmq          ~xreq_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index 20e07bc..b1776df 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -63,16 +63,22 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()          //  in_progress is initialised at this point so in theory we should          //  close it before calling zmq_msg_init_size, however, it's a 0-byte          //  message and thus we can treat it as uninitialised... -        int rc = zmq_msg_init_size (&in_progress, prefix.size () + *tmpbuf); -        errno_assert (rc == 0); - -        //  Fill in the message prefix if any. -        if (!prefix.empty ()) -            memcpy (zmq_msg_data (&in_progress), prefix.data (), -                prefix.size ()); - -        next_step ((unsigned char*) zmq_msg_data (&in_progress) + -            prefix.size (), *tmpbuf, &zmq_decoder_t::message_ready); +        if (prefix.empty ()) { +            int rc = zmq_msg_init_size (&in_progress, *tmpbuf); +            errno_assert (rc == 0); +            next_step (zmq_msg_data (&in_progress), *tmpbuf, +                &zmq_decoder_t::message_ready); +        } +        else { +            int rc = zmq_msg_init_size (&in_progress, +                *tmpbuf + 1 + prefix.size ()); +            errno_assert (rc == 0); +            unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress); +            *data = (unsigned char) prefix.size (); +            memcpy (data + 1, prefix.data (), *data); +            next_step (data + *data + 1, *tmpbuf, +                &zmq_decoder_t::message_ready); +        }      }      return true;  } @@ -87,15 +93,21 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()      //  in_progress is initialised at this point so in theory we should      //  close it before calling zmq_msg_init_size, however, it's a 0-byte      //  message and thus we can treat it as uninitialised... -    int rc = zmq_msg_init_size (&in_progress, prefix.size () + size); -    errno_assert (rc == 0); - -    //  Fill in the message prefix if any. -    if (!prefix.empty ()) -        memcpy (zmq_msg_data (&in_progress), prefix.data (), prefix.size ()); +    if (prefix.empty ()) { +        int rc = zmq_msg_init_size (&in_progress, size); +        errno_assert (rc == 0); +        next_step (zmq_msg_data (&in_progress), size, +            &zmq_decoder_t::message_ready); +    } +    else { +        int rc = zmq_msg_init_size (&in_progress, size + 1 + prefix.size ()); +        errno_assert (rc == 0); +        unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress); +        *data = (unsigned char) prefix.size (); +        memcpy (data + 1, prefix.data (), *data); +        next_step (data + *data + 1, size, &zmq_decoder_t::message_ready); +    } -    next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix.size (), -        size, &zmq_decoder_t::message_ready);      return true;  } diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index a60fe5e..5fca182 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -56,8 +56,9 @@ bool zmq::zmq_encoder_t::size_ready ()      }      else {          size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress); -        next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size, -            zmq_msg_size (&in_progress) - prefix_size, +        next_step ( +            (unsigned char*) zmq_msg_data (&in_progress) + prefix_size + 1, +            zmq_msg_size (&in_progress) - prefix_size - 1,              &zmq_encoder_t::message_ready, false);      }      return true; @@ -80,8 +81,14 @@ bool zmq::zmq_encoder_t::message_ready ()      //  Get the message size. If the prefix is not to be sent, adjust the      //  size accordingly.      size_t size = zmq_msg_size (&in_progress); -    if (trim) -        size -= *(unsigned char*) zmq_msg_data (&in_progress); +    if (trim) { +        zmq_assert (size); +        size_t prefix_size = +            (*(unsigned char*) zmq_msg_data (&in_progress)) + 1; +        zmq_assert (prefix_size <= size); +        size -= prefix_size; +    } +      //  For messages less than 255 bytes long, write one byte of message size.      //  For longer messages write 0xff escape character followed by 8-byte diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 75f3441..152daf6 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -160,10 +160,14 @@ void zmq::zmq_engine_t::revive ()      out_event ();  } -void zmq::zmq_engine_t::traceroute (const blob_t &identity_) +void zmq::zmq_engine_t::add_prefix (const blob_t &identity_) +{     +    decoder.add_prefix (identity_); +} + +void zmq::zmq_engine_t::trim_prefix ()  {      encoder.trim_prefix (); -    decoder.add_prefix (identity_);  }  void zmq::zmq_engine_t::error () diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 8657e8e..dc90a98 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -47,7 +47,8 @@ namespace zmq          void plug (struct i_inout *inout_);          void unplug ();          void revive (); -        void traceroute (const blob_t &identity_); +        void add_prefix (const blob_t &identity_); +        void trim_prefix ();          //  i_poll_events interface implementation.          void in_event (); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 9aebad0..7c5588f 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -74,13 +74,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)      //  Retreieve the remote identity.      peer_identity.assign ((const un | 
