From 4b832ea37410035bba7b85e5e9988af8eda648c2 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 31 Oct 2011 15:56:39 +0100 Subject: Revert the early dropping of request and replies for disconnected clients Signed-off-by: Martin Sustrik --- src/xrep.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index 9f2a947..6ca39f4 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -35,9 +35,11 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : { options.type = ZMQ_XREP; + // TODO: Uncomment the following line when XREP will become true XREP + // rather than generic router socket. // If peer disconnect there's noone to send reply to anyway. We can drop // all the outstanding requests from that peer. - options.delay_on_disconnect = false; + // options.delay_on_disconnect = false; prefetched_msg.init (); } -- cgit v1.2.3 From ac7717b7b35f441fc3aeeb1528e63f147c00913a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 31 Oct 2011 16:20:30 +0100 Subject: 250bpm copyrights added Signed-off-by: Martin Sustrik --- src/xrep.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index 6ca39f4..61e703b 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -1,4 +1,5 @@ /* + Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file -- cgit v1.2.3 From 7842c7107358324e8c5b9af7272e6dcab8c97931 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 1 Nov 2011 13:39:54 +0100 Subject: LABELS and COMMANDs removed Signed-off-by: Martin Sustrik --- src/xrep.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index 61e703b..1b0f44d 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -128,7 +129,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. // TODO: The connections should be killed instead. - if (msg_->flags () & msg_t::label) { + if (msg_->flags () & msg_t::more) { more_out = true; @@ -162,7 +163,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) } // Check whether this is the last part of the message. - more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_out = msg_->flags () & msg_t::more ? true : false; // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -192,7 +193,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (prefetched) { int rc = msg_->move (prefetched_msg); errno_assert (rc == 0); - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_in = msg_->flags () & msg_t::more ? true : false; prefetched = false; return 0; } @@ -205,7 +206,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // If we are in the middle of reading a message, just return the next part. if (more_in) { - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_in = msg_->flags () & msg_t::more ? true : false; return 0; } @@ -219,7 +220,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) rc = msg_->init_size (4); errno_assert (rc == 0); put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); - msg_->set_flags (msg_t::label); + msg_->set_flags (msg_t::more); return 0; } -- cgit v1.2.3 From 8e21d64c974344b5b2b83cac85d12c51392fe74b Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 1 Nov 2011 18:06:11 +0100 Subject: Copyright dates adjusted to reflect reality Signed-off-by: Martin Sustrik --- src/xrep.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index 1b0f44d..350d752 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -1,6 +1,6 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 iMatix Corporation Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file -- cgit v1.2.3 From a4843b65d24f9caa188bb2454b28080f0cee8484 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 4 Nov 2011 08:00:47 +0100 Subject: Identities re-introduced However, the "durable socket" behaviour wasn't re-added. Identities are used solely for routing in REQ/REP pattern. Signed-off-by: Martin Sustrik --- src/xrep.cpp | 102 ++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 63 insertions(+), 39 deletions(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index 350d752..ea19e56 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -43,6 +43,9 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : // all the outstanding requests from that peer. // options.delay_on_disconnect = false; + options.send_identity = true; + options.recv_identity = true; + prefetched_msg.init (); } @@ -56,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); - // Generate a new peer ID. Take care to avoid duplicates. - outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); - if (!outpipes.empty ()) { - while (true) { - if (it == outpipes.end ()) - it = outpipes.begin (); - if (it->first != next_peer_id) - break; - ++next_peer_id; - ++it; - } - } + // Generate a new unique peer identity. + unsigned char buf [5]; + buf [0] = 0; + put_uint32 (buf + 1, next_peer_id); + blob_t identity (buf, 5); + ++next_peer_id; // Add the pipe to the map out outbound pipes. outpipe_t outpipe = {pipe_, true}; bool ok = outpipes.insert (outpipes_t::value_type ( - next_peer_id, outpipe)).second; + identity, outpipe)).second; zmq_assert (ok); // Add the pipe to the list of inbound pipes. - pipe_->set_pipe_id (next_peer_id); - fq.attach (pipe_); - - // Advance next peer ID so that if new connection is dropped shortly after - // its creation we don't accidentally get two subsequent peers with - // the same ID. - ++next_peer_id; + pipe_->set_identity (identity); + fq.attach (pipe_); } void zmq::xrep_t::xterminated (pipe_t *pipe_) @@ -133,26 +125,25 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) more_out = true; - // Find the pipe associated with the peer ID stored in the prefix. + // Find the pipe associated with the identity stored in the prefix. // If there's no such pipe just silently ignore the message. - if (msg_->size () == 4) { - uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); - outpipes_t::iterator it = outpipes.find (peer_id); - - if (it != outpipes.end ()) { - current_out = it->second.pipe; - msg_t empty; - int rc = empty.init (); - errno_assert (rc == 0); - if (!current_out->check_write (&empty)) { - it->second.active = false; - more_out = false; - current_out = NULL; - } - rc = empty.close (); - errno_assert (rc == 0); + blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); + outpipes_t::iterator it = outpipes.find (identity); + + if (it != outpipes.end ()) { + current_out = it->second.pipe; + msg_t empty; + int rc = empty.init (); + errno_assert (rc == 0); + if (!current_out->check_write (&empty)) { + it->second.active = false; + more_out = false; + current_out = NULL; } + rc = empty.close (); + errno_assert (rc == 0); } + } int rc = msg_->close (); @@ -204,6 +195,37 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (rc != 0) return -1; + // If identity is received, change the key assigned to the pipe. + if (unlikely (msg_->flags () & msg_t::identity)) { + zmq_assert (!more_in); + + // Empty identity means we can preserve the auto-generated identity. + if (msg_->size () != 0) { + + // Actual change of the identity. + outpipes_t::iterator it = outpipes.begin (); + while (it != outpipes.end ()) { + if (it->second.pipe == pipe) { + blob_t identity ((unsigned char*) msg_->data (), + msg_->size ()); + pipe->set_identity (identity); + outpipes.erase (it); + outpipe_t outpipe = {pipe, true}; + outpipes.insert (outpipes_t::value_type (identity, + outpipe)); + break; + } + ++it; + } + zmq_assert (it != outpipes.end ()); + } + + // After processing the identity, try to get the next message. + rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; + } + // If we are in the middle of reading a message, just return the next part. if (more_in) { more_in = msg_->flags () & msg_t::more ? true : false; @@ -217,9 +239,11 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) prefetched = true; rc = msg_->close (); errno_assert (rc == 0); - rc = msg_->init_size (4); + + blob_t identity = pipe->get_identity (); + rc = msg_->init_size (identity.size ()); errno_assert (rc == 0); - put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); + memcpy (msg_->data (), identity.data (), identity.size ()); msg_->set_flags (msg_t::more); return 0; } -- cgit v1.2.3