summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-06-22 11:02:16 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-06-22 11:02:16 +0200
commitec81f8fb2523e1e2fe45eaadc05311a35bf551d7 (patch)
treee6fbd9b7a789d72678fa02ca06883de15a932beb /src/xrep.cpp
parent10a93bb79fd3d4be1b3ffedfa6785564fbcc082b (diff)
New wire format for REQ/REP pattern
This patch introduces two changes: 1. 32-bit ID is used to identify the peer instead of UUID 2. REQ socket seeds the label stack with unique 32-bit request ID It also drops any replies with non-matching request ID Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp45
1 files changed, 32 insertions, 13 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index b935c06..ab2f0a8 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -20,6 +20,8 @@
#include "xrep.hpp"
#include "pipe.hpp"
+#include "wire.hpp"
+#include "random.hpp"
#include "err.hpp"
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
@@ -32,9 +34,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREP;
- // On connect, pipes are created only after initial handshaking.
- // That way we are aware of the peer's identity when binding to the pipes.
- options.immediate_connect = false;
+ // Start the peer ID sequence from a random point.
+ generate_random (&next_peer_id, sizeof (next_peer_id));
}
zmq::xrep_t::~xrep_t ()
@@ -47,16 +48,33 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
+ // Generate a new peer ID. Take care to avoid duplicates.
+ outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
+ if (!outpipes.empty ()) {
+ while (true) {
+ if (it == outpipes.end ())
+ it = outpipes.begin ();
+ if (it->first != next_peer_id)
+ break;
+ ++next_peer_id;
+ ++it;
+ }
+ }
+
// Add the pipe to the map out outbound pipes.
- // TODO: What if new connection has same peer identity as the old one?
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (
- peer_identity_, outpipe)).second;
+ next_peer_id, outpipe)).second;
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
- inpipe_t inpipe = {pipe_, peer_identity_, true};
+ inpipe_t inpipe = {pipe_, next_peer_id, true};
inpipes.push_back (inpipe);
+
+ // Advance next peer ID so that if new connection is dropped shortly after
+ // its creation we don't accidentally get two subsequent peers with
+ // the same ID.
+ ++next_peer_id;
}
void zmq::xrep_t::xterminated (pipe_t *pipe_)
@@ -115,7 +133,7 @@ void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
{
- // If this is the first part of the message it's the identity of the
+ // If this is the first part of the message it's the ID of the
// peer to send the message to.
if (!more_out) {
zmq_assert (!current_out);
@@ -127,10 +145,11 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
more_out = true;
- // Find the pipe associated with the identity stored in the prefix.
+ // Find the pipe associated with the peer ID stored in the prefix.
// If there's no such pipe just silently ignore the message.
- blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
- outpipes_t::iterator it = outpipes.find (identity);
+ zmq_assert (msg_->size () == 4);
+ uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
+ outpipes_t::iterator it = outpipes.find (peer_id);
if (it != outpipes.end ()) {
current_out = it->second.pipe;
@@ -220,10 +239,10 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// If we have a message, create a prefix and return it to the caller.
if (prefetched) {
- int rc = msg_->init_size (inpipes [current_in].identity.size ());
+ int rc = msg_->init_size (4);
errno_assert (rc == 0);
- memcpy (msg_->data (), inpipes [current_in].identity.data (),
- msg_->size ());
+ put_uint32 ((unsigned char*) msg_->data (),
+ inpipes [current_in].peer_id);
msg_->set_flags (msg_t::label);
return 0;
}