summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-02-16 18:30:38 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-02-16 18:30:38 +0100
commitb9caa319e279cd8cd367e0a64308b9e80c4ead3d (patch)
tree751d1eb31b0e41bf82f51ad3be69e13a2af70472 /src
parent2ddce205350f11dacd8d8550f7d4e6e088c7fbcd (diff)
Multi-hop REQ/REP, part XI., finalise the XREQ/XREP functionality
Diffstat (limited to 'src')
-rw-r--r--src/downstream.cpp2
-rw-r--r--src/downstream.hpp3
-rw-r--r--src/i_endpoint.hpp4
-rw-r--r--src/i_engine.hpp9
-rw-r--r--src/p2p.cpp2
-rw-r--r--src/p2p.hpp3
-rw-r--r--src/pgm_receiver.cpp8
-rw-r--r--src/pgm_receiver.hpp3
-rw-r--r--src/pgm_sender.cpp8
-rw-r--r--src/pgm_sender.hpp3
-rw-r--r--src/pub.cpp2
-rw-r--r--src/pub.hpp3
-rw-r--r--src/rep.cpp2
-rw-r--r--src/rep.hpp3
-rw-r--r--src/req.cpp2
-rw-r--r--src/req.hpp3
-rw-r--r--src/session.cpp7
-rw-r--r--src/session.hpp3
-rw-r--r--src/socket_base.cpp12
-rw-r--r--src/socket_base.hpp5
-rw-r--r--src/sub.cpp2
-rw-r--r--src/sub.hpp3
-rw-r--r--src/upstream.cpp2
-rw-r--r--src/upstream.hpp3
-rw-r--r--src/xrep.cpp51
-rw-r--r--src/xrep.hpp10
-rw-r--r--src/xreq.cpp2
-rw-r--r--src/xreq.hpp3
-rw-r--r--src/zmq_decoder.cpp48
-rw-r--r--src/zmq_encoder.cpp15
-rw-r--r--src/zmq_engine.cpp8
-rw-r--r--src/zmq_engine.hpp3
-rw-r--r--src/zmq_init.cpp6
33 files changed, 171 insertions, 72 deletions
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 2da08e3..3431264 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -35,7 +35,7 @@ zmq::downstream_t::~downstream_t ()
}
void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_ && outpipe_);
lb.attach (outpipe_);
diff --git a/src/downstream.hpp b/src/downstream.hpp
index 35dec95..dbd79a5 100644
--- a/src/downstream.hpp
+++ b/src/downstream.hpp
@@ -34,7 +34,8 @@ namespace zmq
~downstream_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index d60b39e..ddab6a4 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+#include "blob.hpp"
+
namespace zmq
{
@@ -28,7 +30,7 @@ namespace zmq
virtual ~i_endpoint () {}
virtual void attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_) = 0;
+ class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0;
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index d64027d..81b56df 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -41,10 +41,11 @@ namespace zmq
// are messages to send available.
virtual void revive () = 0;
- // Start tracing the message route. Engine should add the identity
- // supplied to all inbound messages and trim identity from all the
- // outbound messages.
- virtual void traceroute (const blob_t &identity_) = 0;
+ // Engine should add the prefix supplied to all inbound messages.
+ virtual void add_prefix (const blob_t &identity_) = 0;
+
+ // Engine should trim prefix from all the outbound messages.
+ virtual void trim_prefix () = 0;
};
}
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 72bc26b..ca7a8f5 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -42,7 +42,7 @@ zmq::p2p_t::~p2p_t ()
}
void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_;
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 2ff1047..bca0eab 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -33,7 +33,8 @@ namespace zmq
~p2p_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index b7ca327..e708229 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,7 +88,13 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-void zmq::pgm_receiver_t::traceroute (const blob_t &identity_)
+void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
+void zmq::pgm_receiver_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index b01e5b0..3f0ef81 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,7 +54,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (const blob_t &identity_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index acbc3fb..27b4d0c 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -102,7 +102,13 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
-void zmq::pgm_sender_t::traceroute (const blob_t &identity_)
+void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
+void zmq::pgm_sender_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 7041610..951c417 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,7 +52,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (const blob_t &identity_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pub.cpp b/src/pub.cpp
index 05bfdcf..5b9d48c 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -39,7 +39,7 @@ zmq::pub_t::~pub_t ()
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
out_pipes.push_back (outpipe_);
diff --git a/src/pub.hpp b/src/pub.hpp
index 5b2f348..26142a4 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -34,7 +34,8 @@ namespace zmq
~pub_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/rep.cpp b/src/rep.cpp
index 8c5b86c..968427d 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -44,7 +44,7 @@ zmq::rep_t::~rep_t ()
}
void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
diff --git a/src/rep.hpp b/src/rep.hpp
index 7170da7..7ead321 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -34,7 +34,8 @@ namespace zmq
~rep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/req.cpp b/src/req.cpp
index c9240e0..735f0aa 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -39,7 +39,7 @@ zmq::req_t::~req_t ()
}
void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
diff --git a/src/req.hpp b/src/req.hpp
index 60ee5e7..da8e61a 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -34,7 +34,8 @@ namespace zmq
~req_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/session.cpp b/src/session.cpp
index f86327e..74bd8ae 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -124,7 +124,7 @@ uint64_t zmq::session_t::get_ordinal ()
}
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
if (inpipe_) {
zmq_assert (!in_pipe);
@@ -251,4 +251,9 @@ void zmq::session_t::process_attach (i_engine *engine_,
zmq_assert (engine_);
engine = engine_;
engine->plug (this);
+
+ // Once the initial handshaking is over tracerouting should trim prefixes
+ // from outbound messages.
+ if (options.traceroute)
+ engine->trim_prefix ();
}
diff --git a/src/session.hpp b/src/session.hpp
index d412728..872748c 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -51,7 +51,8 @@ namespace zmq
uint64_t get_ordinal ();
// i_endpoint interface implementation.
- void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 222b769..1607673 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -169,7 +169,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach the pipes to this socket object.
attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL);
+ out_pipe ? &out_pipe->writer : NULL, blob_t ());
// Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function. The callee is notified
@@ -211,11 +211,11 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach the pipes to the socket object.
attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL);
+ out_pipe ? &out_pipe->writer : NULL, blob_t ());
// Attach the pipes to the session object.
session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL);
+ in_pipe ? &in_pipe->writer : NULL, blob_t ());
}
// Activate the session.
@@ -553,13 +553,13 @@ void zmq::socket_base_t::revive (reader_t *pipe_)
}
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
if (inpipe_)
inpipe_->set_endpoint (this);
if (outpipe_)
outpipe_->set_endpoint (this);
- xattach_pipes (inpipe_, outpipe_);
+ xattach_pipes (inpipe_, outpipe_, peer_identity_);
}
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
@@ -582,7 +582,7 @@ void zmq::socket_base_t::process_own (owned_t *object_)
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
const blob_t &peer_identity_)
{
- attach_pipes (in_pipe_, out_pipe_);
+ attach_pipes (in_pipe_, out_pipe_, peer_identity_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index a1702a7..5327acc 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -87,7 +87,8 @@ namespace zmq
class session_t *find_session (uint64_t ordinal_);
// i_endpoint interface implementation.
- void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
@@ -100,7 +101,7 @@ namespace zmq
// Pipe management is done by individual socket types.
virtual void xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_) = 0;
+ class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
virtual void xkill (class reader_t *pipe_) = 0;
diff --git a/src/sub.cpp b/src/sub.cpp
index 06ed896..29ac951 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -39,7 +39,7 @@ zmq::sub_t::~sub_t ()
}
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
diff --git a/src/sub.hpp b/src/sub.hpp
index 9e7d6cc..8234b77 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -39,7 +39,8 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/upstream.cpp b/src/upstream.cpp
index bdcd5ef..d7238b9 100644
--- a/src/upstream.cpp
+++ b/src/upstream.cpp
@@ -34,7 +34,7 @@ zmq::upstream_t::~upstream_t ()
}
void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
diff --git a/src/upstream.hpp b/src/upstream.hpp
index 1e6914b..d1ee7b1 100644
--- a/src/upstream.hpp
+++ b/src/upstream.hpp
@@ -34,7 +34,8 @@ namespace zmq
~upstream_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 328a832..6fa6bfa 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -21,6 +21,7 @@
#include "xrep.hpp"
#include "err.hpp"
+#include "pipe.hpp"
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_)
@@ -42,12 +43,15 @@ zmq::xrep_t::~xrep_t ()
}
void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);
- zmq_assert (false);
+ // TODO: What if new connection has same peer identity as the old one?
+ bool ok = outpipes.insert (std::make_pair (
+ peer_identity_, outpipe_)).second;
+ zmq_assert (ok);
}
void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
@@ -58,6 +62,12 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
{
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it)
+ if (it->second == pipe_) {
+ outpipes.erase (it);
+ return;
+ }
zmq_assert (false);
}
@@ -80,8 +90,35 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
- return -1;
+ unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
+ size_t size = zmq_msg_size (msg_);
+
+ // Check whether the message is well-formed.
+ zmq_assert (size >= 1);
+ zmq_assert (size_t (*data + 1) <= size);
+
+ // Find the corresponding outbound pipe. If there's none, just drop the
+ // message.
+ // TODO: There's an allocation here! It's the critical path! Get rid of it!
+ blob_t identity (data + 1, *data);
+ outpipes_t::iterator it = outpipes.find (identity);
+ if (it == outpipes.end ()) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // Push message to the selected pipe.
+ it->second->write (msg_);
+ it->second->flush ();
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
}
int zmq::xrep_t::xflush ()
@@ -102,8 +139,10 @@ bool zmq::xrep_t::xhas_in ()
bool zmq::xrep_t::xhas_out ()
{
- zmq_assert (false);
- return false;
+ // In theory, XREP socket is always ready for writing. Whether actual
+ // attempt to write succeeds depends on whitch pipe the message is going
+ // to be routed to.
+ return true;
}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 67ab02d..4534463 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -20,7 +20,10 @@
#ifndef __ZMQ_XREP_HPP_INCLUDED__
#define __ZMQ_XREP_HPP_INCLUDED__
+#include <map>
+
#include "socket_base.hpp"
+#include "blob.hpp"
#include "fq.hpp"
namespace zmq
@@ -34,7 +37,8 @@ namespace zmq
~xrep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
@@ -51,6 +55,10 @@ namespace zmq
// Inbound messages are fair-queued.
fq_t fq;
+ // Outbound pipes indexed by the peer names.
+ typedef std::map <blob_t, class writer_t*> outpipes_t;
+ outpipes_t outpipes;
+
xrep_t (const xrep_t&);
void operator = (const xrep_t&);
};
diff --git a/src/xreq.cpp b/src/xreq.cpp
index a4310f8..dda924c 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -34,7 +34,7 @@ zmq::xreq_t::~xreq_t ()
}
void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index d0cbb4f..e23e832 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -35,7 +35,8 @@ namespace zmq
~xreq_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index 20e07bc..b1776df 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -63,16 +63,22 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, prefix.size () + *tmpbuf);
- errno_assert (rc == 0);
-
- // Fill in the message prefix if any.
- if (!prefix.empty ())
- memcpy (zmq_msg_data (&in_progress), prefix.data (),
- prefix.size ());
-
- next_step ((unsigned char*) zmq_msg_data (&in_progress) +
- prefix.size (), *tmpbuf, &zmq_decoder_t::message_ready);
+ if (prefix.empty ()) {
+ int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
+ errno_assert (rc == 0);
+ next_step (zmq_msg_data (&in_progress), *tmpbuf,
+ &zmq_decoder_t::message_ready);
+ }
+ else {
+ int rc = zmq_msg_init_size (&in_progress,
+ *tmpbuf + 1 + prefix.size ());
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
+ *data = (unsigned char) prefix.size ();
+ memcpy (data + 1, prefix.data (), *data);
+ next_step (data + *data + 1, *tmpbuf,
+ &zmq_decoder_t::message_ready);
+ }
}
return true;
}
@@ -87,15 +93,21 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, prefix.size () + size);
- errno_assert (rc == 0);
-
- // Fill in the message prefix if any.
- if (!prefix.empty ())
- memcpy (zmq_msg_data (&in_progress), prefix.data (), prefix.size ());
+ if (prefix.empty ()) {
+ int rc = zmq_msg_init_size (&in_progress, size);
+ errno_assert (rc == 0);
+ next_step (zmq_msg_data (&in_progress), size,
+ &zmq_decoder_t::message_ready);
+ }
+ else {
+ int rc = zmq_msg_init_size (&in_progress, size + 1 + prefix.size ());
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
+ *data = (unsigned char) prefix.size ();
+ memcpy (data + 1, prefix.data (), *data);
+ next_step (data + *data + 1, size, &zmq_decoder_t::message_ready);
+ }
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix.size (),
- size, &zmq_decoder_t::message_ready);
return true;
}
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index a60fe5e..5fca182 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -56,8 +56,9 @@ bool zmq::zmq_encoder_t::size_ready ()
}
else {
size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress);
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
- zmq_msg_size (&in_progress) - prefix_size,
+ next_step (
+ (unsigned char*) zmq_msg_data (&in_progress) + prefix_size + 1,
+ zmq_msg_size (&in_progress) - prefix_size - 1,
&zmq_encoder_t::message_ready, false);
}
return true;
@@ -80,8 +81,14 @@ bool zmq::zmq_encoder_t::message_ready ()
// Get the message size. If the prefix is not to be sent, adjust the
// size accordingly.
size_t size = zmq_msg_size (&in_progress);
- if (trim)
- size -= *(unsigned char*) zmq_msg_data (&in_progress);
+ if (trim) {
+ zmq_assert (size);
+ size_t prefix_size =
+ (*(unsigned char*) zmq_msg_data (&in_progress)) + 1;
+ zmq_assert (prefix_size <= size);
+ size -= prefix_size;
+ }
+
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 75f3441..152daf6 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -160,10 +160,14 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
-void zmq::zmq_engine_t::traceroute (const blob_t &identity_)
+void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
+{
+ decoder.add_prefix (identity_);
+}
+
+void zmq::zmq_engine_t::trim_prefix ()
{
encoder.trim_prefix ();
- decoder.add_prefix (identity_);
}
void zmq::zmq_engine_t::error ()
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 8657e8e..dc90a98 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -47,7 +47,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (const blob_t &identity_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index 9aebad0..7c5588f 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -74,13 +74,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
// Retreieve the remote identity.
peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
+ engine->add_prefix (peer_identity);
received = true;
- // Once the initial handshaking is over, XREP sockets should start
- // tracerouting individual messages.
- if (options.traceroute)
- engine->traceroute (peer_identity);
-
return true;
}