From a4843b65d24f9caa188bb2454b28080f0cee8484 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 4 Nov 2011 08:00:47 +0100 Subject: Identities re-introduced However, the "durable socket" behaviour wasn't re-added. Identities are used solely for routing in REQ/REP pattern. Signed-off-by: Martin Sustrik --- src/xrep.cpp | 102 ++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 63 insertions(+), 39 deletions(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index 350d752..ea19e56 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -43,6 +43,9 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : // all the outstanding requests from that peer. // options.delay_on_disconnect = false; + options.send_identity = true; + options.recv_identity = true; + prefetched_msg.init (); } @@ -56,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); - // Generate a new peer ID. Take care to avoid duplicates. - outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); - if (!outpipes.empty ()) { - while (true) { - if (it == outpipes.end ()) - it = outpipes.begin (); - if (it->first != next_peer_id) - break; - ++next_peer_id; - ++it; - } - } + // Generate a new unique peer identity. + unsigned char buf [5]; + buf [0] = 0; + put_uint32 (buf + 1, next_peer_id); + blob_t identity (buf, 5); + ++next_peer_id; // Add the pipe to the map out outbound pipes. outpipe_t outpipe = {pipe_, true}; bool ok = outpipes.insert (outpipes_t::value_type ( - next_peer_id, outpipe)).second; + identity, outpipe)).second; zmq_assert (ok); // Add the pipe to the list of inbound pipes. - pipe_->set_pipe_id (next_peer_id); - fq.attach (pipe_); - - // Advance next peer ID so that if new connection is dropped shortly after - // its creation we don't accidentally get two subsequent peers with - // the same ID. - ++next_peer_id; + pipe_->set_identity (identity); + fq.attach (pipe_); } void zmq::xrep_t::xterminated (pipe_t *pipe_) @@ -133,26 +125,25 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) more_out = true; - // Find the pipe associated with the peer ID stored in the prefix. + // Find the pipe associated with the identity stored in the prefix. // If there's no such pipe just silently ignore the message. - if (msg_->size () == 4) { - uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); - outpipes_t::iterator it = outpipes.find (peer_id); - - if (it != outpipes.end ()) { - current_out = it->second.pipe; - msg_t empty; - int rc = empty.init (); - errno_assert (rc == 0); - if (!current_out->check_write (&empty)) { - it->second.active = false; - more_out = false; - current_out = NULL; - } - rc = empty.close (); - errno_assert (rc == 0); + blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); + outpipes_t::iterator it = outpipes.find (identity); + + if (it != outpipes.end ()) { + current_out = it->second.pipe; + msg_t empty; + int rc = empty.init (); + errno_assert (rc == 0); + if (!current_out->check_write (&empty)) { + it->second.active = false; + more_out = false; + current_out = NULL; } + rc = empty.close (); + errno_assert (rc == 0); } + } int rc = msg_->close (); @@ -204,6 +195,37 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (rc != 0) return -1; + // If identity is received, change the key assigned to the pipe. + if (unlikely (msg_->flags () & msg_t::identity)) { + zmq_assert (!more_in); + + // Empty identity means we can preserve the auto-generated identity. + if (msg_->size () != 0) { + + // Actual change of the identity. + outpipes_t::iterator it = outpipes.begin (); + while (it != outpipes.end ()) { + if (it->second.pipe == pipe) { + blob_t identity ((unsigned char*) msg_->data (), + msg_->size ()); + pipe->set_identity (identity); + outpipes.erase (it); + outpipe_t outpipe = {pipe, true}; + outpipes.insert (outpipes_t::value_type (identity, + outpipe)); + break; + } + ++it; + } + zmq_assert (it != outpipes.end ()); + } + + // After processing the identity, try to get the next message. + rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; + } + // If we are in the middle of reading a message, just return the next part. if (more_in) { more_in = msg_->flags () & msg_t::more ? true : false; @@ -217,9 +239,11 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) prefetched = true; rc = msg_->close (); errno_assert (rc == 0); - rc = msg_->init_size (4); + + blob_t identity = pipe->get_identity (); + rc = msg_->init_size (identity.size ()); errno_assert (rc == 0); - put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); + memcpy (msg_->data (), identity.data (), identity.size ()); msg_->set_flags (msg_t::more); return 0; } -- cgit v1.2.3