summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-11-04 08:00:47 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-11-04 08:00:47 +0100
commita4843b65d24f9caa188bb2454b28080f0cee8484 (patch)
tree9fb56a811f693c968074433fe899ce7dd55a76fe /src/xrep.cpp
parentd20ea25b8c63e148fe48cc2b85bac9c896f1073b (diff)
Identities re-introduced
However, the "durable socket" behaviour wasn't re-added. Identities are used solely for routing in REQ/REP pattern. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp102
1 files changed, 63 insertions, 39 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 350d752..ea19e56 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -43,6 +43,9 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
// all the outstanding requests from that peer.
// options.delay_on_disconnect = false;
+ options.send_identity = true;
+ options.recv_identity = true;
+
prefetched_msg.init ();
}
@@ -56,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
- // Generate a new peer ID. Take care to avoid duplicates.
- outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
- if (!outpipes.empty ()) {
- while (true) {
- if (it == outpipes.end ())
- it = outpipes.begin ();
- if (it->first != next_peer_id)
- break;
- ++next_peer_id;
- ++it;
- }
- }
+ // Generate a new unique peer identity.
+ unsigned char buf [5];
+ buf [0] = 0;
+ put_uint32 (buf + 1, next_peer_id);
+ blob_t identity (buf, 5);
+ ++next_peer_id;
// Add the pipe to the map out outbound pipes.
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (
- next_peer_id, outpipe)).second;
+ identity, outpipe)).second;
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
- pipe_->set_pipe_id (next_peer_id);
- fq.attach (pipe_);
-
- // Advance next peer ID so that if new connection is dropped shortly after
- // its creation we don't accidentally get two subsequent peers with
- // the same ID.
- ++next_peer_id;
+ pipe_->set_identity (identity);
+ fq.attach (pipe_);
}
void zmq::xrep_t::xterminated (pipe_t *pipe_)
@@ -133,26 +125,25 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
more_out = true;
- // Find the pipe associated with the peer ID stored in the prefix.
+ // Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message.
- if (msg_->size () == 4) {
- uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
- outpipes_t::iterator it = outpipes.find (peer_id);
-
- if (it != outpipes.end ()) {
- current_out = it->second.pipe;
- msg_t empty;
- int rc = empty.init ();
- errno_assert (rc == 0);
- if (!current_out->check_write (&empty)) {
- it->second.active = false;
- more_out = false;
- current_out = NULL;
- }
- rc = empty.close ();
- errno_assert (rc == 0);
+ blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
+ outpipes_t::iterator it = outpipes.find (identity);
+
+ if (it != outpipes.end ()) {
+ current_out = it->second.pipe;
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
+ if (!current_out->check_write (&empty)) {
+ it->second.active = false;
+ more_out = false;
+ current_out = NULL;
}
+ rc = empty.close ();
+ errno_assert (rc == 0);
}
+
}
int rc = msg_->close ();
@@ -204,6 +195,37 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (rc != 0)
return -1;
+ // If identity is received, change the key assigned to the pipe.
+ if (unlikely (msg_->flags () & msg_t::identity)) {
+ zmq_assert (!more_in);
+
+ // Empty identity means we can preserve the auto-generated identity.
+ if (msg_->size () != 0) {
+
+ // Actual change of the identity.
+ outpipes_t::iterator it = outpipes.begin ();
+ while (it != outpipes.end ()) {
+ if (it->second.pipe == pipe) {
+ blob_t identity ((unsigned char*) msg_->data (),
+ msg_->size ());
+ pipe->set_identity (identity);
+ outpipes.erase (it);
+ outpipe_t outpipe = {pipe, true};
+ outpipes.insert (outpipes_t::value_type (identity,
+ outpipe));
+ break;
+ }
+ ++it;
+ }
+ zmq_assert (it != outpipes.end ());
+ }
+
+ // After processing the identity, try to get the next message.
+ rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
+ }
+
// If we are in the middle of reading a message, just return the next part.
if (more_in) {
more_in = msg_->flags () & msg_t::more ? true : false;
@@ -217,9 +239,11 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
prefetched = true;
rc = msg_->close ();
errno_assert (rc == 0);
- rc = msg_->init_size (4);
+
+ blob_t identity = pipe->get_identity ();
+ rc = msg_->init_size (identity.size ());
errno_assert (rc == 0);
- put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
+ memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::more);
return 0;
}