summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-06-22 11:02:16 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-06-22 11:02:16 +0200
commitec81f8fb2523e1e2fe45eaadc05311a35bf551d7 (patch)
treee6fbd9b7a789d72678fa02ca06883de15a932beb /src
parent10a93bb79fd3d4be1b3ffedfa6785564fbcc082b (diff)
New wire format for REQ/REP pattern
This patch introduces two changes: 1. 32-bit ID is used to identify the peer instead of UUID 2. REQ socket seeds the label stack with unique 32-bit request ID It also drops any replies with non-matching request ID Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/random.cpp39
-rw-r--r--src/random.hpp34
-rw-r--r--src/rep.cpp52
-rw-r--r--src/req.cpp38
-rw-r--r--src/req.hpp5
-rw-r--r--src/socket_base.cpp18
-rw-r--r--src/xrep.cpp45
-rw-r--r--src/xrep.hpp15
9 files changed, 179 insertions, 69 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 92ceb20..ae20d33 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -53,6 +53,7 @@ libzmq_la_SOURCES = \
pub.hpp \
pull.hpp \
push.hpp \
+ random.hpp \
reaper.hpp \
rep.hpp \
req.hpp \
@@ -117,6 +118,7 @@ libzmq_la_SOURCES = \
push.cpp \
reaper.cpp \
pub.cpp \
+ random.cpp \
rep.cpp \
req.cpp \
router.cpp \
diff --git a/src/random.cpp b/src/random.cpp
new file mode 100644
index 0000000..ee7a7fb
--- /dev/null
+++ b/src/random.cpp
@@ -0,0 +1,39 @@
+/*
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "random.hpp"
+#include "uuid.hpp"
+#include "err.hpp"
+
+// Here we can use different ways of generating random data, as avialable
+// on different platforms. At the moment, we'll assume the UUID is random
+// enough to use for that purpose.
+void zmq::generate_random (void *buf_, size_t size_)
+{
+ // Collapsing an UUID into 4 bytes.
+ zmq_assert (size_ == 4);
+ uint32_t buff [4];
+ generate_uuid ((void*) buff);
+ uint32_t result = buff [0];
+ result ^= buff [1];
+ result ^= buff [2];
+ result ^= buff [3];
+ *((uint32_t*) buf_) = result;
+}
diff --git a/src/random.hpp b/src/random.hpp
new file mode 100644
index 0000000..0b99bbd
--- /dev/null
+++ b/src/random.hpp
@@ -0,0 +1,34 @@
+/*
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_RANDOM_HPP_INCLUDED__
+#define __ZMQ_RANDOM_HPP_INCLUDED__
+
+#include <stddef.h>
+
+namespace zmq
+{
+
+ // Generates truly random bytes (not pseudo-random).
+ void generate_random (void *buf_, size_t size_);
+
+}
+
+#endif
diff --git a/src/rep.cpp b/src/rep.cpp
index b987d9c..a5d1462 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -64,54 +64,32 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
return -1;
}
+ // First thing to do when receiving a request is to copy all the labels
+ // to the reply pipe.
if (request_begins) {
-
- // Copy the backtrace stack to the reply pipe.
while (true) {
-
- // TODO: If request can be read but reply pipe is not
- // ready for writing, we should drop the reply.
-
- // Get next part of the backtrace stack.
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
+ if (!(msg_->flags () & msg_t::label))
+ break;
- if (msg_->flags () & (msg_t::more | msg_t::label)) {
-
- // Empty message part delimits the traceback stack.
- bool bottom = (msg_->size () == 0);
-
- // Push it to the reply pipe.
- rc = xrep_t::xsend (msg_, flags_);
- zmq_assert (rc == 0);
-
- // The end of the traceback, move to processing message body.
- if (bottom)
- break;
- }
- else {
-
- // If the traceback stack is malformed, discard anything
- // already sent to pipe (we're at end of invalid message)
- // and continue reading -- that'll switch us to the next pipe
- // and next request.
- rc = xrep_t::rollback ();
- zmq_assert (rc == 0);
- }
+ // TODO: If the reply cannot be sent to the peer because
+ // od congestion, we should drop it.
+ rc = xrep_t::xsend (msg_, flags_);
+ zmq_assert (rc == 0);
}
-
request_begins = false;
}
-
- // Now the routing info is safely stored. Get the first part
- // of the message payload and exit.
- int rc = xrep_t::xrecv (msg_, flags_);
- if (rc != 0)
- return rc;
+ else {
+ int rc = xrep_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+ }
+ zmq_assert (!(msg_->flags () & msg_t::label));
// If whole request is read, flip the FSM to reply-sending state.
- if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
+ if (!(msg_->flags () & msg_t::more)) {
sending_reply = true;
request_begins = true;
}
diff --git a/src/req.cpp b/src/req.cpp
index b0e58dc..e51b853 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -21,13 +21,21 @@
#include "req.hpp"
#include "err.hpp"
#include "msg.hpp"
+#include "uuid.hpp"
+#include "wire.hpp"
+#include "random.hpp"
+#include "likely.hpp"
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_),
receiving_reply (false),
- message_begins (true)
+ message_begins (true),
+ request_id (0)
{
options.type = ZMQ_REQ;
+
+ // Start the request ID sequence at an random point.
+ generate_random (&request_id, sizeof (request_id));
}
zmq::req_t::~req_t ()
@@ -43,12 +51,14 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
return -1;
}
- // First part of the request is empty message part (stack bottom).
+ // First part of the request is the request identity.
if (message_begins) {
msg_t prefix;
- int rc = prefix.init ();
+ int rc = prefix.init_size (4);
errno_assert (rc == 0);
prefix.set_flags (msg_t::label);
+ unsigned char *data = (unsigned char*) prefix.data ();
+ put_uint32 (data, request_id);
rc = xreq_t::xsend (&prefix, flags_);
if (rc != 0)
return rc;
@@ -78,13 +88,28 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return -1;
}
- // First part of the reply should be empty message part (stack bottom).
+ // First part of the reply should be the original request ID.
if (message_begins) {
int rc = xreq_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
zmq_assert (msg_->flags () & msg_t::label);
- zmq_assert (msg_->size () == 0);
+ zmq_assert (msg_->size () == 4);
+ unsigned char *data = (unsigned char*) msg_->data ();
+ if (unlikely (get_uint32 (data) != request_id)) {
+
+ // The request ID does not match. Drop the entire message.
+ while (true) {
+ int rc = xreq_t::xrecv (msg_, flags_);
+ errno_assert (rc == 0);
+ if (!(msg_->flags () & (msg_t::label | msg_t::more)))
+ break;
+ }
+ msg_->close ();
+ msg_->init ();
+ errno = EAGAIN;
+ return -1;
+ }
message_begins = false;
}
@@ -94,6 +119,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
// If the reply is fully received, flip the FSM into request-sending state.
if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
+ request_id++;
receiving_reply = false;
message_begins = true;
}
@@ -103,6 +129,8 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
bool zmq::req_t::xhas_in ()
{
+ // TODO: Duplicates should be removed here.
+
if (!receiving_reply)
return false;
diff --git a/src/req.hpp b/src/req.hpp
index e0554ac..50dcb44 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -22,6 +22,7 @@
#define __ZMQ_REQ_HPP_INCLUDED__
#include "xreq.hpp"
+#include "stdint.hpp"
namespace zmq
{
@@ -49,6 +50,10 @@ namespace zmq
// of the message must be empty message part (backtrace stack bottom).
bool message_begins;
+ // Request ID. Request numbers gradually increase (and wrap over)
+ // so that we don't have to generate random ID for each request.
+ uint32_t request_id;
+
req_t (const req_t&);
const req_t &operator = (const req_t&);
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index eaf1776..804ec46 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -598,15 +598,15 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
ticks = 0;
rc = xrecv (msg_, flags_);
- if (rc == 0) {
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
- }
- return rc;
+ if (rc < 0)
+ return rc;
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
+ rcvmore = msg_->flags () & msg_t::more;
+ if (rcvmore)
+ msg_->reset_flags (msg_t::more);
+ return 0;
}
// Compute the time when the timeout should occur.
diff --git a/src/xrep.cpp b/src/xrep.cpp
index b935c06..ab2f0a8 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -20,6 +20,8 @@
#include "xrep.hpp"
#include "pipe.hpp"
+#include "wire.hpp"
+#include "random.hpp"
#include "err.hpp"
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
@@ -32,9 +34,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREP;
- // On connect, pipes are created only after initial handshaking.
- // That way we are aware of the peer's identity when binding to the pipes.
- options.immediate_connect = false;
+ // Start the peer ID sequence from a random point.
+ generate_random (&next_peer_id, sizeof (next_peer_id));
}
zmq::xrep_t::~xrep_t ()
@@ -47,16 +48,33 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
+ // Generate a new peer ID. Take care to avoid duplicates.
+ outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
+ if (!outpipes.empty ()) {
+ while (true) {
+ if (it == outpipes.end ())
+ it = outpipes.begin ();
+ if (it->first != next_peer_id)
+ break;
+ ++next_peer_id;
+ ++it;
+ }
+ }
+
// Add the pipe to the map out outbound pipes.
- // TODO: What if new connection has same peer identity as the old one?
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (
- peer_identity_, outpipe)).second;
+ next_peer_id, outpipe)).second;
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
- inpipe_t inpipe = {pipe_, peer_identity_, true};
+ inpipe_t inpipe = {pipe_, next_peer_id, true};
inpipes.push_back (inpipe);
+
+ // Advance next peer ID so that if new connection is dropped shortly after
+ // its creation we don't accidentally get two subsequent peers with
+ // the same ID.
+ ++next_peer_id;
}
void zmq::xrep_t::xterminated (pipe_t *pipe_)
@@ -115,7 +133,7 @@ void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
{
- // If this is the first part of the message it's the identity of the
+ // If this is the first part of the message it's the ID of the
// peer to send the message to.
if (!more_out) {
zmq_assert (!current_out);
@@ -127,10 +145,11 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
more_out = true;
- // Find the pipe associated with the identity stored in the prefix.
+ // Find the pipe associated with the peer ID stored in the prefix.
// If there's no such pipe just silently ignore the message.
- blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
- outpipes_t::iterator it = outpipes.find (identity);
+ zmq_assert (msg_->size () == 4);
+ uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
+ outpipes_t::iterator it = outpipes.find (peer_id);
if (it != outpipes.end ()) {
current_out = it->second.pipe;
@@ -220,10 +239,10 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// If we have a message, create a prefix and return it to the caller.
if (prefetched) {
- int rc = msg_->init_size (inpipes [current_in].identity.size ());
+ int rc = msg_->init_size (4);
errno_assert (rc == 0);
- memcpy (msg_->data (), inpipes [current_in].identity.data (),
- msg_->size ());
+ put_uint32 ((unsigned char*) msg_->data (),
+ inpipes [current_in].peer_id);
msg_->set_flags (msg_t::label);
return 0;
}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index fbc7385..d5014e0 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -25,7 +25,7 @@
#include <vector>
#include "socket_base.hpp"
-#include "blob.hpp"
+#include "stdint.hpp"
#include "msg.hpp"
namespace zmq
@@ -41,7 +41,8 @@ namespace zmq
~xrep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
+ void xattach_pipe (class pipe_t *pipe_,
+ const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
@@ -60,7 +61,7 @@ namespace zmq
struct inpipe_t
{
class pipe_t *pipe;
- blob_t identity;
+ uint32_t peer_id;
bool active;
};
@@ -86,8 +87,8 @@ namespace zmq
bool active;
};
- // Outbound pipes indexed by the peer names.
- typedef std::map <blob_t, outpipe_t> outpipes_t;
+ // Outbound pipes indexed by the peer IDs.
+ typedef std::map <uint32_t, outpipe_t> outpipes_t;
outpipes_t outpipes;
// The pipe we are currently writing to.
@@ -96,6 +97,10 @@ namespace zmq
// If true, more outgoing message parts are expected.
bool more_out;
+ // Peer ID are generated. It's a simple increment and wrap-over
+ // algorithm. This value is the next ID to use (if not used already).
+ uint32_t next_peer_id;
+
xrep_t (const xrep_t&);
const xrep_t &operator = (const xrep_t&);
};