summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/msg.hpp1
-rw-r--r--src/options.cpp4
-rw-r--r--src/options.hpp6
-rw-r--r--src/pipe.cpp11
-rw-r--r--src/pipe.hpp10
-rw-r--r--src/req.cpp15
-rw-r--r--src/req.hpp1
-rw-r--r--src/session_base.cpp20
-rw-r--r--src/session_base.hpp5
-rw-r--r--src/socket_base.cpp9
-rw-r--r--src/xrep.cpp102
-rw-r--r--src/xrep.hpp4
-rw-r--r--src/xreq.cpp14
-rw-r--r--tests/test_invalid_rep.cpp17
14 files changed, 155 insertions, 64 deletions
diff --git a/src/msg.hpp b/src/msg.hpp
index f2f8fcf..8c84670 100644
--- a/src/msg.hpp
+++ b/src/msg.hpp
@@ -50,6 +50,7 @@ namespace zmq
enum
{
more = 1,
+ identity = 64,
shared = 128
};
diff --git a/src/options.cpp b/src/options.cpp
index aa94a21..4db1a6c 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -46,7 +46,9 @@ zmq::options_t::options_t () :
ipv4only (1),
delay_on_close (true),
delay_on_disconnect (true),
- filter (false)
+ filter (false),
+ send_identity (false),
+ recv_identity (false)
{
}
diff --git a/src/options.hpp b/src/options.hpp
index d017c00..bfc9dc7 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -99,6 +99,12 @@ namespace zmq
// If 1, (X)SUB socket should filter the messages. If 0, it should not.
bool filter;
+
+ // Sends identity to all new connections.
+ bool send_identity;
+
+ // Receivers identity from all new connections.
+ bool recv_identity;
};
}
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 9f44c94..25dd51c 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -65,8 +65,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL),
sink (NULL),
state (active),
- delay (delay_),
- pipe_id (0)
+ delay (delay_)
{
}
@@ -88,14 +87,14 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
sink = sink_;
}
-void zmq::pipe_t::set_pipe_id (uint32_t id_)
+void zmq::pipe_t::set_identity (const blob_t &identity_)
{
- pipe_id = id_;
+ identity = identity_;
}
-uint32_t zmq::pipe_t::get_pipe_id ()
+zmq::blob_t zmq::pipe_t::get_identity ()
{
- return pipe_id;
+ return identity;
}
bool zmq::pipe_t::check_read ()
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 4533e58..75a2021 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -28,6 +29,7 @@
#include "object.hpp"
#include "stdint.hpp"
#include "array.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -71,8 +73,8 @@ namespace zmq
void set_event_sink (i_pipe_events *sink_);
// Pipe endpoint can store an opaque ID to be used by its clients.
- void set_pipe_id (uint32_t id_);
- uint32_t get_pipe_id ();
+ void set_identity (const blob_t &identity_);
+ blob_t get_identity ();
// Returns true if there is at least one message to read in the pipe.
bool check_read ();
@@ -183,8 +185,8 @@ namespace zmq
// asks us to.
bool delay;
- // Opaque ID. To be used by the clients, not the pipe itself.
- uint32_t pipe_id;
+ // Identity of the writer. Used uniquely by the reader side.
+ blob_t identity;
// Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (msg_t &msg_);
diff --git a/src/req.cpp b/src/req.cpp
index 40c4765..3ba1ec0 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -147,23 +147,32 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
zmq::req_session_t::~req_session_t ()
{
+ state = options.recv_identity ? identity : bottom;
}
int zmq::req_session_t::write (msg_t *msg_)
{
- if (state == bottom) {
+ switch (state) {
+ case bottom:
if (msg_->flags () == msg_t::more && msg_->size () == 0) {
state = body;
return xreq_session_t::write (msg_);
}
- }
- else {
+ break;
+ case body:
if (msg_->flags () == msg_t::more)
return xreq_session_t::write (msg_);
if (msg_->flags () == 0) {
state = bottom;
return xreq_session_t::write (msg_);
}
+ break;
+ case identity:
+ if (msg_->flags () == 0) {
+ state = bottom;
+ return xreq_session_t::write (msg_);
+ }
+ break;
}
errno = EFAULT;
return -1;
diff --git a/src/req.hpp b/src/req.hpp
index 61066ca..8fae9d4 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -71,6 +71,7 @@ namespace zmq
private:
enum {
+ identity,
bottom,
body
} state;
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 4c5e512..f2ee713 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -112,7 +112,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
engine (NULL),
socket (socket_),
io_thread (io_thread_),
- has_linger_timer (false)
+ has_linger_timer (false),
+ send_identity (options_.send_identity),
+ recv_identity (options_.recv_identity)
{
if (protocol_)
protocol = protocol_;
@@ -146,6 +148,16 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::read (msg_t *msg_)
{
+ // First message to send is identity (if required).
+ if (send_identity) {
+ zmq_assert (!(msg_->flags () & msg_t::more));
+ msg_->init_size (options.identity_size);
+ memcpy (msg_->data (), options.identity, options.identity_size);
+ send_identity = false;
+ incomplete_in = false;
+ return 0;
+ }
+
if (!pipe || !pipe->read (msg_)) {
errno = EAGAIN;
return -1;
@@ -157,6 +169,12 @@ int zmq::session_base_t::read (msg_t *msg_)
int zmq::session_base_t::write (msg_t *msg_)
{
+ // First message to receive is identity (if required).
+ if (recv_identity) {
+ msg_->set_flags (msg_t::identity);
+ recv_identity = false;
+ }
+
if (pipe && pipe->write (msg_)) {
int rc = msg_->init ();
errno_assert (rc == 0);
diff --git a/src/session_base.hpp b/src/session_base.hpp
index 86a670f..c89628f 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -119,6 +120,10 @@ namespace zmq
// True is linger timer is running.
bool has_linger_timer;
+ // If true, identity is to be sent/recvd from the network.
+ bool send_identity;
+ bool recv_identity;
+
// Protocol and address to use when connecting.
std::string protocol;
std::string address;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 583818b..a59ba69 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -845,7 +845,16 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
+ // Test whether IDENTITY flag is valid for this socket type.
+ if (unlikely (msg_->flags () & msg_t::identity)) {
+ zmq_assert (options.recv_identity);
+printf ("identity recvd\n");
+ }
+
+
+ // Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false;
if (rcvmore)
msg_->reset_flags (msg_t::more);
}
+
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 350d752..ea19e56 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -43,6 +43,9 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
// all the outstanding requests from that peer.
// options.delay_on_disconnect = false;
+ options.send_identity = true;
+ options.recv_identity = true;
+
prefetched_msg.init ();
}
@@ -56,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
- // Generate a new peer ID. Take care to avoid duplicates.
- outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
- if (!outpipes.empty ()) {
- while (true) {
- if (it == outpipes.end ())
- it = outpipes.begin ();
- if (it->first != next_peer_id)
- break;
- ++next_peer_id;
- ++it;
- }
- }
+ // Generate a new unique peer identity.
+ unsigned char buf [5];
+ buf [0] = 0;
+ put_uint32 (buf + 1, next_peer_id);
+ blob_t identity (buf, 5);
+ ++next_peer_id;
// Add the pipe to the map out outbound pipes.
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (
- next_peer_id, outpipe)).second;
+ identity, outpipe)).second;
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
- pipe_->set_pipe_id (next_peer_id);
- fq.attach (pipe_);
-
- // Advance next peer ID so that if new connection is dropped shortly after
- // its creation we don't accidentally get two subsequent peers with
- // the same ID.
- ++next_peer_id;
+ pipe_->set_identity (identity);
+ fq.attach (pipe_);
}
void zmq::xrep_t::xterminated (pipe_t *pipe_)
@@ -133,26 +125,25 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
more_out = true;
- // Find the pipe associated with the peer ID stored in the prefix.
+ // Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message.
- if (msg_->size () == 4) {
- uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
- outpipes_t::iterator it = outpipes.find (peer_id);
-
- if (it != outpipes.end ()) {
- current_out = it->second.pipe;
- msg_t empty;
- int rc = empty.init ();
- errno_assert (rc == 0);
- if (!current_out->check_write (&empty)) {
- it->second.active = false;
- more_out = false;
- current_out = NULL;
- }
- rc = empty.close ();
- errno_assert (rc == 0);
+ blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
+ outpipes_t::iterator it = outpipes.find (identity);
+
+ if (it != outpipes.end ()) {
+ current_out = it->second.pipe;
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
+ if (!current_out->check_write (&empty)) {
+ it->second.active = false;
+ more_out = false;
+ current_out = NULL;
}
+ rc = empty.close ();
+ errno_assert (rc == 0);
}
+
}
int rc = msg_->close ();
@@ -204,6 +195,37 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (rc != 0)
return -1;
+ // If identity is received, change the key assigned to the pipe.
+ if (unlikely (msg_->flags () & msg_t::identity)) {
+ zmq_assert (!more_in);
+
+ // Empty identity means we can preserve the auto-generated identity.
+ if (msg_->size () != 0) {
+
+ // Actual change of the identity.
+ outpipes_t::iterator it = outpipes.begin ();
+ while (it != outpipes.end ()) {
+ if (it->second.pipe == pipe) {
+ blob_t identity ((unsigned char*) msg_->data (),
+ msg_->size ());
+ pipe->set_identity (identity);
+ outpipes.erase (it);
+ outpipe_t outpipe = {pipe, true};
+ outpipes.insert (outpipes_t::value_type (identity,
+ outpipe));
+ break;
+ }
+ ++it;
+ }
+ zmq_assert (it != outpipes.end ());
+ }
+
+ // After processing the identity, try to get the next message.
+ rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
+ }
+
// If we are in the middle of reading a message, just return the next part.
if (more_in) {
more_in = msg_->flags () & msg_t::more ? true : false;
@@ -217,9 +239,11 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
prefetched = true;
rc = msg_->close ();
errno_assert (rc == 0);
- rc = msg_->init_size (4);
+
+ blob_t identity = pipe->get_identity ();
+ rc = msg_->init_size (identity.size ());
errno_assert (rc == 0);
- put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
+ memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::more);
return 0;
}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 8cec683..fc02b11 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -27,6 +28,7 @@
#include "socket_base.hpp"
#include "session_base.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
#include "msg.hpp"
#include "fq.hpp"
@@ -78,7 +80,7 @@ namespace zmq
};
// Outbound pipes indexed by the peer IDs.
- typedef std::map <uint32_t, outpipe_t> outpipes_t;
+ typedef std::map <blob_t, outpipe_t> outpipes_t;
outpipes_t outpipes;
// The pipe we are currently writing to.
diff --git a/src/xreq.cpp b/src/xreq.cpp
index f4f962f..91317f7 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -1,5 +1,6 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -32,6 +33,9 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
// If the socket is closing we can drop all the outbound requests. There'll
// be noone to receive the replies anyway.
// options.delay_on_close = false;
+
+ options.send_identity = true;
+ options.recv_identity = true;
}
zmq::xreq_t::~xreq_t ()
@@ -52,7 +56,15 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_)
int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
{
- return fq.recv (msg_, flags_);
+ // XREQ socket doesn't use identities. We can safely drop it and
+ while (true) {
+ int rc = fq.recv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+ if (likely (!(msg_->flags () & msg_t::identity)))
+ break;
+ }
+ return 0;
}
bool zmq::xreq_t::xhas_in ()
diff --git a/tests/test_invalid_rep.cpp b/tests/test_invalid_rep.cpp
index ada7ee1..9c77cc4 100644
--- a/tests/test_invalid_rep.cpp
+++ b/tests/test_invalid_rep.cpp
@@ -25,7 +25,7 @@
int main (int argc, char *argv [])
{
- fprintf (stderr, "test_pair_inproc running...\n");
+ fprintf (stderr, "test_invalid_rep running...\n");
// Create REQ/XREP wiring.
void *ctx = zmq_init (1);
@@ -49,23 +49,24 @@ int main (int argc, char *argv [])
assert (rc == 1);
// Receive the request.
- char addr [4];
+ char addr [32];
+ int addr_size;
char bottom [1];
char body [1];
- rc = zmq_recv (xrep_socket, addr, sizeof (addr), 0);
- assert (rc == 4);
+ addr_size = zmq_recv (xrep_socket, addr, sizeof (addr), 0);
+ assert (addr_size >= 0);
rc = zmq_recv (xrep_socket, bottom, sizeof (bottom), 0);
assert (rc == 0);
rc = zmq_recv (xrep_socket, body, sizeof (body), 0);
assert (rc == 1);
// Send invalid reply.
- rc = zmq_send (xrep_socket, addr, 4, 0);
- assert (rc == 4);
+ rc = zmq_send (xrep_socket, addr, addr_size, 0);
+ assert (rc == addr_size);
// Send valid reply.
- rc = zmq_send (xrep_socket, addr, 4, ZMQ_SNDMORE);
- assert (rc == 4);
+ rc = zmq_send (xrep_socket, addr, addr_size, ZMQ_SNDMORE);
+ assert (rc == addr_size);
rc = zmq_send (xrep_socket, bottom, 0, ZMQ_SNDMORE);
assert (rc == 0);
rc = zmq_send (xrep_socket, "b", 1, 0);