From b9caa319e279cd8cd367e0a64308b9e80c4ead3d Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 16 Feb 2010 18:30:38 +0100 Subject: Multi-hop REQ/REP, part XI., finalise the XREQ/XREP functionality --- src/downstream.cpp | 2 +- src/downstream.hpp | 3 ++- src/i_endpoint.hpp | 4 +++- src/i_engine.hpp | 9 +++++---- src/p2p.cpp | 2 +- src/p2p.hpp | 3 ++- src/pgm_receiver.cpp | 8 +++++++- src/pgm_receiver.hpp | 3 ++- src/pgm_sender.cpp | 8 +++++++- src/pgm_sender.hpp | 3 ++- src/pub.cpp | 2 +- src/pub.hpp | 3 ++- src/rep.cpp | 2 +- src/rep.hpp | 3 ++- src/req.cpp | 2 +- src/req.hpp | 3 ++- src/session.cpp | 7 ++++++- src/session.hpp | 3 ++- src/socket_base.cpp | 12 ++++++------ src/socket_base.hpp | 5 +++-- src/sub.cpp | 2 +- src/sub.hpp | 3 ++- src/upstream.cpp | 2 +- src/upstream.hpp | 3 ++- src/xrep.cpp | 51 +++++++++++++++++++++++++++++++++++++++++++++------ src/xrep.hpp | 10 +++++++++- src/xreq.cpp | 2 +- src/xreq.hpp | 3 ++- src/zmq_decoder.cpp | 48 ++++++++++++++++++++++++++++++------------------ src/zmq_encoder.cpp | 15 +++++++++++---- src/zmq_engine.cpp | 8 ++++++-- src/zmq_engine.hpp | 3 ++- src/zmq_init.cpp | 6 +----- 33 files changed, 171 insertions(+), 72 deletions(-) (limited to 'src') diff --git a/src/downstream.cpp b/src/downstream.cpp index 2da08e3..3431264 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -35,7 +35,7 @@ zmq::downstream_t::~downstream_t () } void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe_ && outpipe_); lb.attach (outpipe_); diff --git a/src/downstream.hpp b/src/downstream.hpp index 35dec95..dbd79a5 100644 --- a/src/downstream.hpp +++ b/src/downstream.hpp @@ -34,7 +34,8 @@ namespace zmq ~downstream_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index d60b39e..ddab6a4 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__ #define __ZMQ_I_ENDPOINT_HPP_INCLUDED__ +#include "blob.hpp" + namespace zmq { @@ -28,7 +30,7 @@ namespace zmq virtual ~i_endpoint () {} virtual void attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) = 0; + class writer_t *outpipe_, const blob_t &peer_identity_) = 0; virtual void detach_inpipe (class reader_t *pipe_) = 0; virtual void detach_outpipe (class writer_t *pipe_) = 0; virtual void kill (class reader_t *pipe_) = 0; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index d64027d..81b56df 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -41,10 +41,11 @@ namespace zmq // are messages to send available. virtual void revive () = 0; - // Start tracing the message route. Engine should add the identity - // supplied to all inbound messages and trim identity from all the - // outbound messages. - virtual void traceroute (const blob_t &identity_) = 0; + // Engine should add the prefix supplied to all inbound messages. + virtual void add_prefix (const blob_t &identity_) = 0; + + // Engine should trim prefix from all the outbound messages. + virtual void trim_prefix () = 0; }; } diff --git a/src/p2p.cpp b/src/p2p.cpp index 72bc26b..ca7a8f5 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -42,7 +42,7 @@ zmq::p2p_t::~p2p_t () } void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe && !outpipe); inpipe = inpipe_; diff --git a/src/p2p.hpp b/src/p2p.hpp index 2ff1047..bca0eab 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -33,7 +33,8 @@ namespace zmq ~p2p_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index b7ca327..e708229 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,7 +88,13 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } -void zmq::pgm_receiver_t::traceroute (const blob_t &identity_) +void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) +{ + // No need for tracerouting functionality in PGM socket at the moment. + zmq_assert (false); +} + +void zmq::pgm_receiver_t::trim_prefix () { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index b01e5b0..3f0ef81 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,7 +54,8 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (const blob_t &identity_); + void add_prefix (const blob_t &identity_); + void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index acbc3fb..27b4d0c 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -102,7 +102,13 @@ void zmq::pgm_sender_t::revive () out_event (); } -void zmq::pgm_sender_t::traceroute (const blob_t &identity_) +void zmq::pgm_sender_t::add_prefix (const blob_t &identity_) +{ + // No need for tracerouting functionality in PGM socket at the moment. + zmq_assert (false); +} + +void zmq::pgm_sender_t::trim_prefix () { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 7041610..951c417 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,7 +52,8 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (const blob_t &identity_); + void add_prefix (const blob_t &identity_); + void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/pub.cpp b/src/pub.cpp index 05bfdcf..5b9d48c 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -39,7 +39,7 @@ zmq::pub_t::~pub_t () } void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe_); out_pipes.push_back (outpipe_); diff --git a/src/pub.hpp b/src/pub.hpp index 5b2f348..26142a4 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -34,7 +34,8 @@ namespace zmq ~pub_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/rep.cpp b/src/rep.cpp index 8c5b86c..968427d 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -44,7 +44,7 @@ zmq::rep_t::~rep_t () } void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && outpipe_); zmq_assert (in_pipes.size () == out_pipes.size ()); diff --git a/src/rep.hpp b/src/rep.hpp index 7170da7..7ead321 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -34,7 +34,8 @@ namespace zmq ~rep_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/req.cpp b/src/req.cpp index c9240e0..735f0aa 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -39,7 +39,7 @@ zmq::req_t::~req_t () } void zmq::req_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && outpipe_); zmq_assert (in_pipes.size () == out_pipes.size ()); diff --git a/src/req.hpp b/src/req.hpp index 60ee5e7..da8e61a 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -34,7 +34,8 @@ namespace zmq ~req_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/session.cpp b/src/session.cpp index f86327e..74bd8ae 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -124,7 +124,7 @@ uint64_t zmq::session_t::get_ordinal () } void zmq::session_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { if (inpipe_) { zmq_assert (!in_pipe); @@ -251,4 +251,9 @@ void zmq::session_t::process_attach (i_engine *engine_, zmq_assert (engine_); engine = engine_; engine->plug (this); + + // Once the initial handshaking is over tracerouting should trim prefixes + // from outbound messages. + if (options.traceroute) + engine->trim_prefix (); } diff --git a/src/session.hpp b/src/session.hpp index d412728..872748c 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -51,7 +51,8 @@ namespace zmq uint64_t get_ordinal (); // i_endpoint interface implementation. - void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void detach_inpipe (class reader_t *pipe_); void detach_outpipe (class writer_t *pipe_); void kill (class reader_t *pipe_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 222b769..1607673 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -169,7 +169,7 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach the pipes to this socket object. attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL); + out_pipe ? &out_pipe->writer : NULL, blob_t ()); // Attach the pipes to the peer socket. Note that peer's seqnum // was incremented in find_endpoint function. The callee is notified @@ -211,11 +211,11 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach the pipes to the socket object. attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL); + out_pipe ? &out_pipe->writer : NULL, blob_t ()); // Attach the pipes to the session object. session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL); + in_pipe ? &in_pipe->writer : NULL, blob_t ()); } // Activate the session. @@ -553,13 +553,13 @@ void zmq::socket_base_t::revive (reader_t *pipe_) } void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { if (inpipe_) inpipe_->set_endpoint (this); if (outpipe_) outpipe_->set_endpoint (this); - xattach_pipes (inpipe_, outpipe_); + xattach_pipes (inpipe_, outpipe_, peer_identity_); } void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) @@ -582,7 +582,7 @@ void zmq::socket_base_t::process_own (owned_t *object_) void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_) { - attach_pipes (in_pipe_, out_pipe_); + attach_pipes (in_pipe_, out_pipe_, peer_identity_); } void zmq::socket_base_t::process_term_req (owned_t *object_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index a1702a7..5327acc 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -87,7 +87,8 @@ namespace zmq class session_t *find_session (uint64_t ordinal_); // i_endpoint interface implementation. - void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void detach_inpipe (class reader_t *pipe_); void detach_outpipe (class writer_t *pipe_); void kill (class reader_t *pipe_); @@ -100,7 +101,7 @@ namespace zmq // Pipe management is done by individual socket types. virtual void xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) = 0; + class writer_t *outpipe_, const blob_t &peer_identity_) = 0; virtual void xdetach_inpipe (class reader_t *pipe_) = 0; virtual void xdetach_outpipe (class writer_t *pipe_) = 0; virtual void xkill (class reader_t *pipe_) = 0; diff --git a/src/sub.cpp b/src/sub.cpp index 06ed896..29ac951 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -39,7 +39,7 @@ zmq::sub_t::~sub_t () } void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && !outpipe_); fq.attach (inpipe_); diff --git a/src/sub.hpp b/src/sub.hpp index 9e7d6cc..8234b77 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -39,7 +39,8 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/upstream.cpp b/src/upstream.cpp index bdcd5ef..d7238b9 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -34,7 +34,7 @@ zmq::upstream_t::~upstream_t () } void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && !outpipe_); fq.attach (inpipe_); diff --git a/src/upstream.hpp b/src/upstream.hpp index 1e6914b..d1ee7b1 100644 --- a/src/upstream.hpp +++ b/src/upstream.hpp @@ -34,7 +34,8 @@ namespace zmq ~upstream_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/xrep.cpp b/src/xrep.cpp index 328a832..6fa6bfa 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -21,6 +21,7 @@ #include "xrep.hpp" #include "err.hpp" +#include "pipe.hpp" zmq::xrep_t::xrep_t (class app_thread_t *parent_) : socket_base_t (parent_) @@ -42,12 +43,15 @@ zmq::xrep_t::~xrep_t () } void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && outpipe_); fq.attach (inpipe_); - zmq_assert (false); + // TODO: What if new connection has same peer identity as the old one? + bool ok = outpipes.insert (std::make_pair ( + peer_identity_, outpipe_)).second; + zmq_assert (ok); } void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) @@ -58,6 +62,12 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) { + for (outpipes_t::iterator it = outpipes.begin (); + it != outpipes.end (); ++it) + if (it->second == pipe_) { + outpipes.erase (it); + return; + } zmq_assert (false); } @@ -80,8 +90,35 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) { - zmq_assert (false); - return -1; + unsigned char *data = (unsigned char*) zmq_msg_data (msg_); + size_t size = zmq_msg_size (msg_); + + // Check whether the message is well-formed. + zmq_assert (size >= 1); + zmq_assert (size_t (*data + 1) <= size); + + // Find the corresponding outbound pipe. If there's none, just drop the + // message. + // TODO: There's an allocation here! It's the critical path! Get rid of it! + blob_t identity (data + 1, *data); + outpipes_t::iterator it = outpipes.find (identity); + if (it == outpipes.end ()) { + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + // Push message to the selected pipe. + it->second->write (msg_); + it->second->flush (); + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; } int zmq::xrep_t::xflush () @@ -102,8 +139,10 @@ bool zmq::xrep_t::xhas_in () bool zmq::xrep_t::xhas_out () { - zmq_assert (false); - return false; + // In theory, XREP socket is always ready for writing. Whether actual + // attempt to write succeeds depends on whitch pipe the message is going + // to be routed to. + return true; } diff --git a/src/xrep.hpp b/src/xrep.hpp index 67ab02d..4534463 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -20,7 +20,10 @@ #ifndef __ZMQ_XREP_HPP_INCLUDED__ #define __ZMQ_XREP_HPP_INCLUDED__ +#include + #include "socket_base.hpp" +#include "blob.hpp" #include "fq.hpp" namespace zmq @@ -34,7 +37,8 @@ namespace zmq ~xrep_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); @@ -51,6 +55,10 @@ namespace zmq // Inbound messages are fair-queued. fq_t fq; + // Outbound pipes indexed by the peer names. + typedef std::map outpipes_t; + outpipes_t outpipes; + xrep_t (const xrep_t&); void operator = (const xrep_t&); }; diff --git a/src/xreq.cpp b/src/xreq.cpp index a4310f8..dda924c 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -34,7 +34,7 @@ zmq::xreq_t::~xreq_t () } void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && outpipe_); fq.attach (inpipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index d0cbb4f..e23e832 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -35,7 +35,8 @@ namespace zmq ~xreq_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index 20e07bc..b1776df 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -63,16 +63,22 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, prefix.size () + *tmpbuf); - errno_assert (rc == 0); - - // Fill in the message prefix if any. - if (!prefix.empty ()) - memcpy (zmq_msg_data (&in_progress), prefix.data (), - prefix.size ()); - - next_step ((unsigned char*) zmq_msg_data (&in_progress) + - prefix.size (), *tmpbuf, &zmq_decoder_t::message_ready); + if (prefix.empty ()) { + int rc = zmq_msg_init_size (&in_progress, *tmpbuf); + errno_assert (rc == 0); + next_step (zmq_msg_data (&in_progress), *tmpbuf, + &zmq_decoder_t::message_ready); + } + else { + int rc = zmq_msg_init_size (&in_progress, + *tmpbuf + 1 + prefix.size ()); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress); + *data = (unsigned char) prefix.size (); + memcpy (data + 1, prefix.data (), *data); + next_step (data + *data + 1, *tmpbuf, + &zmq_decoder_t::message_ready); + } } return true; } @@ -87,15 +93,21 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, prefix.size () + size); - errno_assert (rc == 0); - - // Fill in the message prefix if any. - if (!prefix.empty ()) - memcpy (zmq_msg_data (&in_progress), prefix.data (), prefix.size ()); + if (prefix.empty ()) { + int rc = zmq_msg_init_size (&in_progress, size); + errno_assert (rc == 0); + next_step (zmq_msg_data (&in_progress), size, + &zmq_decoder_t::message_ready); + } + else { + int rc = zmq_msg_init_size (&in_progress, size + 1 + prefix.size ()); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress); + *data = (unsigned char) prefix.size (); + memcpy (data + 1, prefix.data (), *data); + next_step (data + *data + 1, size, &zmq_decoder_t::message_ready); + } - next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix.size (), - size, &zmq_decoder_t::message_ready); return true; } diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index a60fe5e..5fca182 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -56,8 +56,9 @@ bool zmq::zmq_encoder_t::size_ready () } else { size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress); - next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size, - zmq_msg_size (&in_progress) - prefix_size, + next_step ( + (unsigned char*) zmq_msg_data (&in_progress) + prefix_size + 1, + zmq_msg_size (&in_progress) - prefix_size - 1, &zmq_encoder_t::message_ready, false); } return true; @@ -80,8 +81,14 @@ bool zmq::zmq_encoder_t::message_ready () // Get the message size. If the prefix is not to be sent, adjust the // size accordingly. size_t size = zmq_msg_size (&in_progress); - if (trim) - size -= *(unsigned char*) zmq_msg_data (&in_progress); + if (trim) { + zmq_assert (size); + size_t prefix_size = + (*(unsigned char*) zmq_msg_data (&in_progress)) + 1; + zmq_assert (prefix_size <= size); + size -= prefix_size; + } + // For messages less than 255 bytes long, write one byte of message size. // For longer messages write 0xff escape character followed by 8-byte diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 75f3441..152daf6 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -160,10 +160,14 @@ void zmq::zmq_engine_t::revive () out_event (); } -void zmq::zmq_engine_t::traceroute (const blob_t &identity_) +void zmq::zmq_engine_t::add_prefix (const blob_t &identity_) +{ + decoder.add_prefix (identity_); +} + +void zmq::zmq_engine_t::trim_prefix () { encoder.trim_prefix (); - decoder.add_prefix (identity_); } void zmq::zmq_engine_t::error () diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 8657e8e..dc90a98 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -47,7 +47,8 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (const blob_t &identity_); + void add_prefix (const blob_t &identity_); + void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 9aebad0..7c5588f 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -74,13 +74,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) // Retreieve the remote identity. peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), zmq_msg_size (msg_)); + engine->add_prefix (peer_identity); received = true; - // Once the initial handshaking is over, XREP sockets should start - // tracerouting individual messages. - if (options.traceroute) - engine->traceroute (peer_identity); - return true; } -- cgit v1.2.3