summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-11-04 09:48:25 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-11-04 09:48:25 +0100
commit05ce301f3571e3e690792a189cb927328163f0bc (patch)
tree1145b921033a2b88e9b987ec1d3e03060559702d /src/xrep.cpp
parenta8362abf11b51dd553766fb07a9e60f28e788126 (diff)
parent6cdd720400ea456ccbfdf09cdc5054ab07dbdc6f (diff)
Merge branch 'master' of github.com:zeromq/libzmq
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp120
1 files changed, 74 insertions, 46 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 9f2a947..ea19e56 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -35,9 +37,14 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREP;
+ // TODO: Uncomment the following line when XREP will become true XREP
+ // rather than generic router socket.
// If peer disconnect there's noone to send reply to anyway. We can drop
// all the outstanding requests from that peer.
- options.delay_on_disconnect = false;
+ // options.delay_on_disconnect = false;
+
+ options.send_identity = true;
+ options.recv_identity = true;
prefetched_msg.init ();
}
@@ -52,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
- // Generate a new peer ID. Take care to avoid duplicates.
- outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
- if (!outpipes.empty ()) {
- while (true) {
- if (it == outpipes.end ())
- it = outpipes.begin ();
- if (it->first != next_peer_id)
- break;
- ++next_peer_id;
- ++it;
- }
- }
+ // Generate a new unique peer identity.
+ unsigned char buf [5];
+ buf [0] = 0;
+ put_uint32 (buf + 1, next_peer_id);
+ blob_t identity (buf, 5);
+ ++next_peer_id;
// Add the pipe to the map out outbound pipes.
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (
- next_peer_id, outpipe)).second;
+ identity, outpipe)).second;
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
- pipe_->set_pipe_id (next_peer_id);
- fq.attach (pipe_);
-
- // Advance next peer ID so that if new connection is dropped shortly after
- // its creation we don't accidentally get two subsequent peers with
- // the same ID.
- ++next_peer_id;
+ pipe_->set_identity (identity);
+ fq.attach (pipe_);
}
void zmq::xrep_t::xterminated (pipe_t *pipe_)
@@ -125,30 +121,29 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
- if (msg_->flags () & msg_t::label) {
+ if (msg_->flags () & msg_t::more) {
more_out = true;
- // Find the pipe associated with the peer ID stored in the prefix.
+ // Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message.
- if (msg_->size () == 4) {
- uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
- outpipes_t::iterator it = outpipes.find (peer_id);
-
- if (it != outpipes.end ()) {
- current_out = it->second.pipe;
- msg_t empty;
- int rc = empty.init ();
- errno_assert (rc == 0);
- if (!current_out->check_write (&empty)) {
- it->second.active = false;
- more_out = false;
- current_out = NULL;
- }
- rc = empty.close ();
- errno_assert (rc == 0);
+ blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
+ outpipes_t::iterator it = outpipes.find (identity);
+
+ if (it != outpipes.end ()) {
+ current_out = it->second.pipe;
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
+ if (!current_out->check_write (&empty)) {
+ it->second.active = false;
+ more_out = false;
+ current_out = NULL;
}
+ rc = empty.close ();
+ errno_assert (rc == 0);
}
+
}
int rc = msg_->close ();
@@ -159,7 +154,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
}
// Check whether this is the last part of the message.
- more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_out = msg_->flags () & msg_t::more ? true : false;
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
@@ -189,7 +184,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (prefetched) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_in = msg_->flags () & msg_t::more ? true : false;
prefetched = false;
return 0;
}
@@ -200,9 +195,40 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (rc != 0)
return -1;
+ // If identity is received, change the key assigned to the pipe.
+ if (unlikely (msg_->flags () & msg_t::identity)) {
+ zmq_assert (!more_in);
+
+ // Empty identity means we can preserve the auto-generated identity.
+ if (msg_->size () != 0) {
+
+ // Actual change of the identity.
+ outpipes_t::iterator it = outpipes.begin ();
+ while (it != outpipes.end ()) {
+ if (it->second.pipe == pipe) {
+ blob_t identity ((unsigned char*) msg_->data (),
+ msg_->size ());
+ pipe->set_identity (identity);
+ outpipes.erase (it);
+ outpipe_t outpipe = {pipe, true};
+ outpipes.insert (outpipes_t::value_type (identity,
+ outpipe));
+ break;
+ }
+ ++it;
+ }
+ zmq_assert (it != outpipes.end ());
+ }
+
+ // After processing the identity, try to get the next message.
+ rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
+ }
+
// If we are in the middle of reading a message, just return the next part.
if (more_in) {
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_in = msg_->flags () & msg_t::more ? true : false;
return 0;
}
@@ -213,10 +239,12 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
prefetched = true;
rc = msg_->close ();
errno_assert (rc == 0);
- rc = msg_->init_size (4);
+
+ blob_t identity = pipe->get_identity ();
+ rc = msg_->init_size (identity.size ());
errno_assert (rc == 0);
- put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
- msg_->set_flags (msg_t::label);
+ memcpy (msg_->data (), identity.data (), identity.size ());
+ msg_->set_flags (msg_t::more);
return 0;
}