diff options
author | sustrik <sustrik@250bpm.com> | 2011-11-04 02:15:37 -0700 |
---|---|---|
committer | sustrik <sustrik@250bpm.com> | 2011-11-04 02:15:37 -0700 |
commit | 6cdd720400ea456ccbfdf09cdc5054ab07dbdc6f (patch) | |
tree | 6a25ede64ac1252e022feb91b1342cdc38e3dcf5 /src/xrep.cpp | |
parent | 541b83bc02784c721efa3d9dde8f8a191c3c3b7b (diff) | |
parent | e9c3a227a7175b4eda5193b1c8ce6985f5ed89f3 (diff) |
Merge pull request #220 from 250bpm/HEAD
Refactoring
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r-- | src/xrep.cpp | 120 |
1 files changed, 74 insertions, 46 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp index 9f2a947..ea19e56 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -35,9 +37,14 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : { options.type = ZMQ_XREP; + // TODO: Uncomment the following line when XREP will become true XREP + // rather than generic router socket. // If peer disconnect there's noone to send reply to anyway. We can drop // all the outstanding requests from that peer. - options.delay_on_disconnect = false; + // options.delay_on_disconnect = false; + + options.send_identity = true; + options.recv_identity = true; prefetched_msg.init (); } @@ -52,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); - // Generate a new peer ID. Take care to avoid duplicates. - outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); - if (!outpipes.empty ()) { - while (true) { - if (it == outpipes.end ()) - it = outpipes.begin (); - if (it->first != next_peer_id) - break; - ++next_peer_id; - ++it; - } - } + // Generate a new unique peer identity. + unsigned char buf [5]; + buf [0] = 0; + put_uint32 (buf + 1, next_peer_id); + blob_t identity (buf, 5); + ++next_peer_id; // Add the pipe to the map out outbound pipes. outpipe_t outpipe = {pipe_, true}; bool ok = outpipes.insert (outpipes_t::value_type ( - next_peer_id, outpipe)).second; + identity, outpipe)).second; zmq_assert (ok); // Add the pipe to the list of inbound pipes. - pipe_->set_pipe_id (next_peer_id); - fq.attach (pipe_); - - // Advance next peer ID so that if new connection is dropped shortly after - // its creation we don't accidentally get two subsequent peers with - // the same ID. - ++next_peer_id; + pipe_->set_identity (identity); + fq.attach (pipe_); } void zmq::xrep_t::xterminated (pipe_t *pipe_) @@ -125,30 +121,29 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. // TODO: The connections should be killed instead. - if (msg_->flags () & msg_t::label) { + if (msg_->flags () & msg_t::more) { more_out = true; - // Find the pipe associated with the peer ID stored in the prefix. + // Find the pipe associated with the identity stored in the prefix. // If there's no such pipe just silently ignore the message. - if (msg_->size () == 4) { - uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); - outpipes_t::iterator it = outpipes.find (peer_id); - - if (it != outpipes.end ()) { - current_out = it->second.pipe; - msg_t empty; - int rc = empty.init (); - errno_assert (rc == 0); - if (!current_out->check_write (&empty)) { - it->second.active = false; - more_out = false; - current_out = NULL; - } - rc = empty.close (); - errno_assert (rc == 0); + blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); + outpipes_t::iterator it = outpipes.find (identity); + + if (it != outpipes.end ()) { + current_out = it->second.pipe; + msg_t empty; + int rc = empty.init (); + errno_assert (rc == 0); + if (!current_out->check_write (&empty)) { + it->second.active = false; + more_out = false; + current_out = NULL; } + rc = empty.close (); + errno_assert (rc == 0); } + } int rc = msg_->close (); @@ -159,7 +154,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) } // Check whether this is the last part of the message. - more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_out = msg_->flags () & msg_t::more ? true : false; // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -189,7 +184,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (prefetched) { int rc = msg_->move (prefetched_msg); errno_assert (rc == 0); - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_in = msg_->flags () & msg_t::more ? true : false; prefetched = false; return 0; } @@ -200,9 +195,40 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (rc != 0) return -1; + // If identity is received, change the key assigned to the pipe. + if (unlikely (msg_->flags () & msg_t::identity)) { + zmq_assert (!more_in); + + // Empty identity means we can preserve the auto-generated identity. + if (msg_->size () != 0) { + + // Actual change of the identity. + outpipes_t::iterator it = outpipes.begin (); + while (it != outpipes.end ()) { + if (it->second.pipe == pipe) { + blob_t identity ((unsigned char*) msg_->data (), + msg_->size ()); + pipe->set_identity (identity); + outpipes.erase (it); + outpipe_t outpipe = {pipe, true}; + outpipes.insert (outpipes_t::value_type (identity, + outpipe)); + break; + } + ++it; + } + zmq_assert (it != outpipes.end ()); + } + + // After processing the identity, try to get the next message. + rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; + } + // If we are in the middle of reading a message, just return the next part. if (more_in) { - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_in = msg_->flags () & msg_t::more ? true : false; return 0; } @@ -213,10 +239,12 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) prefetched = true; rc = msg_->close (); errno_assert (rc == 0); - rc = msg_->init_size (4); + + blob_t identity = pipe->get_identity (); + rc = msg_->init_size (identity.size ()); errno_assert (rc == 0); - put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); - msg_->set_flags (msg_t::label); + memcpy (msg_->data (), identity.data (), identity.size ()); + msg_->set_flags (msg_t::more); return 0; } |