From ec81f8fb2523e1e2fe45eaadc05311a35bf551d7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 22 Jun 2011 11:02:16 +0200 Subject: New wire format for REQ/REP pattern This patch introduces two changes: 1. 32-bit ID is used to identify the peer instead of UUID 2. REQ socket seeds the label stack with unique 32-bit request ID It also drops any replies with non-matching request ID Signed-off-by: Martin Sustrik --- src/xrep.cpp | 45 ++++++++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 13 deletions(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index b935c06..ab2f0a8 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -20,6 +20,8 @@ #include "xrep.hpp" #include "pipe.hpp" +#include "wire.hpp" +#include "random.hpp" #include "err.hpp" zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : @@ -32,9 +34,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : { options.type = ZMQ_XREP; - // On connect, pipes are created only after initial handshaking. - // That way we are aware of the peer's identity when binding to the pipes. - options.immediate_connect = false; + // Start the peer ID sequence from a random point. + generate_random (&next_peer_id, sizeof (next_peer_id)); } zmq::xrep_t::~xrep_t () @@ -47,16 +48,33 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); + // Generate a new peer ID. Take care to avoid duplicates. + outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); + if (!outpipes.empty ()) { + while (true) { + if (it == outpipes.end ()) + it = outpipes.begin (); + if (it->first != next_peer_id) + break; + ++next_peer_id; + ++it; + } + } + // Add the pipe to the map out outbound pipes. - // TODO: What if new connection has same peer identity as the old one? outpipe_t outpipe = {pipe_, true}; bool ok = outpipes.insert (outpipes_t::value_type ( - peer_identity_, outpipe)).second; + next_peer_id, outpipe)).second; zmq_assert (ok); // Add the pipe to the list of inbound pipes. - inpipe_t inpipe = {pipe_, peer_identity_, true}; + inpipe_t inpipe = {pipe_, next_peer_id, true}; inpipes.push_back (inpipe); + + // Advance next peer ID so that if new connection is dropped shortly after + // its creation we don't accidentally get two subsequent peers with + // the same ID. + ++next_peer_id; } void zmq::xrep_t::xterminated (pipe_t *pipe_) @@ -115,7 +133,7 @@ void zmq::xrep_t::xwrite_activated (pipe_t *pipe_) int zmq::xrep_t::xsend (msg_t *msg_, int flags_) { - // If this is the first part of the message it's the identity of the + // If this is the first part of the message it's the ID of the // peer to send the message to. if (!more_out) { zmq_assert (!current_out); @@ -127,10 +145,11 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) more_out = true; - // Find the pipe associated with the identity stored in the prefix. + // Find the pipe associated with the peer ID stored in the prefix. // If there's no such pipe just silently ignore the message. - blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); - outpipes_t::iterator it = outpipes.find (identity); + zmq_assert (msg_->size () == 4); + uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); + outpipes_t::iterator it = outpipes.find (peer_id); if (it != outpipes.end ()) { current_out = it->second.pipe; @@ -220,10 +239,10 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // If we have a message, create a prefix and return it to the caller. if (prefetched) { - int rc = msg_->init_size (inpipes [current_in].identity.size ()); + int rc = msg_->init_size (4); errno_assert (rc == 0); - memcpy (msg_->data (), inpipes [current_in].identity.data (), - msg_->size ()); + put_uint32 ((unsigned char*) msg_->data (), + inpipes [current_in].peer_id); msg_->set_flags (msg_t::label); return 0; } -- cgit v1.2.3