diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-06-22 11:02:16 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-06-22 11:02:16 +0200 |
commit | ec81f8fb2523e1e2fe45eaadc05311a35bf551d7 (patch) | |
tree | e6fbd9b7a789d72678fa02ca06883de15a932beb | |
parent | 10a93bb79fd3d4be1b3ffedfa6785564fbcc082b (diff) |
New wire format for REQ/REP pattern
This patch introduces two changes:
1. 32-bit ID is used to identify the peer instead of UUID
2. REQ socket seeds the label stack with unique 32-bit request ID
It also drops any replies with non-matching request ID
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/random.cpp | 39 | ||||
-rw-r--r-- | src/random.hpp | 34 | ||||
-rw-r--r-- | src/rep.cpp | 52 | ||||
-rw-r--r-- | src/req.cpp | 38 | ||||
-rw-r--r-- | src/req.hpp | 5 | ||||
-rw-r--r-- | src/socket_base.cpp | 18 | ||||
-rw-r--r-- | src/xrep.cpp | 45 | ||||
-rw-r--r-- | src/xrep.hpp | 15 | ||||
-rw-r--r-- | tests/Makefile.am | 5 | ||||
-rw-r--r-- | tests/test_reqrep_device.cpp | 160 |
12 files changed, 344 insertions, 70 deletions
@@ -29,6 +29,7 @@ tests/test_reqrep_tcp tests/test_shutdown_stress tests/test_hwm tests/test_timeo +tests/test_reqrep_device src/platform.hpp* src/stamp-h1 devices/zmq_forwarder/zmq_forwarder diff --git a/src/Makefile.am b/src/Makefile.am index 92ceb20..ae20d33 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -53,6 +53,7 @@ libzmq_la_SOURCES = \ pub.hpp \ pull.hpp \ push.hpp \ + random.hpp \ reaper.hpp \ rep.hpp \ req.hpp \ @@ -117,6 +118,7 @@ libzmq_la_SOURCES = \ push.cpp \ reaper.cpp \ pub.cpp \ + random.cpp \ rep.cpp \ req.cpp \ router.cpp \ diff --git a/src/random.cpp b/src/random.cpp new file mode 100644 index 0000000..ee7a7fb --- /dev/null +++ b/src/random.cpp @@ -0,0 +1,39 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "random.hpp" +#include "uuid.hpp" +#include "err.hpp" + +// Here we can use different ways of generating random data, as avialable +// on different platforms. At the moment, we'll assume the UUID is random +// enough to use for that purpose. +void zmq::generate_random (void *buf_, size_t size_) +{ + // Collapsing an UUID into 4 bytes. + zmq_assert (size_ == 4); + uint32_t buff [4]; + generate_uuid ((void*) buff); + uint32_t result = buff [0]; + result ^= buff [1]; + result ^= buff [2]; + result ^= buff [3]; + *((uint32_t*) buf_) = result; +} diff --git a/src/random.hpp b/src/random.hpp new file mode 100644 index 0000000..0b99bbd --- /dev/null +++ b/src/random.hpp @@ -0,0 +1,34 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_RANDOM_HPP_INCLUDED__ +#define __ZMQ_RANDOM_HPP_INCLUDED__ + +#include <stddef.h> + +namespace zmq +{ + + // Generates truly random bytes (not pseudo-random). + void generate_random (void *buf_, size_t size_); + +} + +#endif diff --git a/src/rep.cpp b/src/rep.cpp index b987d9c..a5d1462 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -64,54 +64,32 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) return -1; } + // First thing to do when receiving a request is to copy all the labels + // to the reply pipe. if (request_begins) { - - // Copy the backtrace stack to the reply pipe. while (true) { - - // TODO: If request can be read but reply pipe is not - // ready for writing, we should drop the reply. - - // Get next part of the backtrace stack. int rc = xrep_t::xrecv (msg_, flags_); if (rc != 0) return rc; + if (!(msg_->flags () & msg_t::label)) + break; - if (msg_->flags () & (msg_t::more | msg_t::label)) { - - // Empty message part delimits the traceback stack. - bool bottom = (msg_->size () == 0); - - // Push it to the reply pipe. - rc = xrep_t::xsend (msg_, flags_); - zmq_assert (rc == 0); - - // The end of the traceback, move to processing message body. - if (bottom) - break; - } - else { - - // If the traceback stack is malformed, discard anything - // already sent to pipe (we're at end of invalid message) - // and continue reading -- that'll switch us to the next pipe - // and next request. - rc = xrep_t::rollback (); - zmq_assert (rc == 0); - } + // TODO: If the reply cannot be sent to the peer because + // od congestion, we should drop it. + rc = xrep_t::xsend (msg_, flags_); + zmq_assert (rc == 0); } - request_begins = false; } - - // Now the routing info is safely stored. Get the first part - // of the message payload and exit. - int rc = xrep_t::xrecv (msg_, flags_); - if (rc != 0) - return rc; + else { + int rc = xrep_t::xrecv (msg_, flags_); + if (rc != 0) + return rc; + } + zmq_assert (!(msg_->flags () & msg_t::label)); // If whole request is read, flip the FSM to reply-sending state. - if (!(msg_->flags () & (msg_t::more | msg_t::label))) { + if (!(msg_->flags () & msg_t::more)) { sending_reply = true; request_begins = true; } diff --git a/src/req.cpp b/src/req.cpp index b0e58dc..e51b853 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -21,13 +21,21 @@ #include "req.hpp" #include "err.hpp" #include "msg.hpp" +#include "uuid.hpp" +#include "wire.hpp" +#include "random.hpp" +#include "likely.hpp" zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) : xreq_t (parent_, tid_), receiving_reply (false), - message_begins (true) + message_begins (true), + request_id (0) { options.type = ZMQ_REQ; + + // Start the request ID sequence at an random point. + generate_random (&request_id, sizeof (request_id)); } zmq::req_t::~req_t () @@ -43,12 +51,14 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_) return -1; } - // First part of the request is empty message part (stack bottom). + // First part of the request is the request identity. if (message_begins) { msg_t prefix; - int rc = prefix.init (); + int rc = prefix.init_size (4); errno_assert (rc == 0); prefix.set_flags (msg_t::label); + unsigned char *data = (unsigned char*) prefix.data (); + put_uint32 (data, request_id); rc = xreq_t::xsend (&prefix, flags_); if (rc != 0) return rc; @@ -78,13 +88,28 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) return -1; } - // First part of the reply should be empty message part (stack bottom). + // First part of the reply should be the original request ID. if (message_begins) { int rc = xreq_t::xrecv (msg_, flags_); if (rc != 0) return rc; zmq_assert (msg_->flags () & msg_t::label); - zmq_assert (msg_->size () == 0); + zmq_assert (msg_->size () == 4); + unsigned char *data = (unsigned char*) msg_->data (); + if (unlikely (get_uint32 (data) != request_id)) { + + // The request ID does not match. Drop the entire message. + while (true) { + int rc = xreq_t::xrecv (msg_, flags_); + errno_assert (rc == 0); + if (!(msg_->flags () & (msg_t::label | msg_t::more))) + break; + } + msg_->close (); + msg_->init (); + errno = EAGAIN; + return -1; + } message_begins = false; } @@ -94,6 +119,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) // If the reply is fully received, flip the FSM into request-sending state. if (!(msg_->flags () & (msg_t::more | msg_t::label))) { + request_id++; receiving_reply = false; message_begins = true; } @@ -103,6 +129,8 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) bool zmq::req_t::xhas_in () { + // TODO: Duplicates should be removed here. + if (!receiving_reply) return false; diff --git a/src/req.hpp b/src/req.hpp index e0554ac..50dcb44 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -22,6 +22,7 @@ #define __ZMQ_REQ_HPP_INCLUDED__ #include "xreq.hpp" +#include "stdint.hpp" namespace zmq { @@ -49,6 +50,10 @@ namespace zmq // of the message must be empty message part (backtrace stack bottom). bool message_begins; + // Request ID. Request numbers gradually increase (and wrap over) + // so that we don't have to generate random ID for each request. + uint32_t request_id; + req_t (const req_t&); const req_t &operator = (const req_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index eaf1776..804ec46 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -598,15 +598,15 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ticks = 0; rc = xrecv (msg_, flags_); - if (rc == 0) { - rcvlabel = msg_->flags () & msg_t::label; - if (rcvlabel) - msg_->reset_flags (msg_t::label); - rcvmore = msg_->flags () & msg_t::more; - if (rcvmore) - msg_->reset_flags (msg_t::more); - } - return rc; + if (rc < 0) + return rc; + rcvlabel = msg_->flags () & msg_t::label; + if (rcvlabel) + msg_->reset_flags (msg_t::label); + rcvmore = msg_->flags () & msg_t::more; + if (rcvmore) + msg_->reset_flags (msg_t::more); + return 0; } // Compute the time when the timeout should occur. diff --git a/src/xrep.cpp b/src/xrep.cpp index b935c06..ab2f0a8 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -20,6 +20,8 @@ #include "xrep.hpp" #include "pipe.hpp" +#include "wire.hpp" +#include "random.hpp" #include "err.hpp" zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : @@ -32,9 +34,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : { options.type = ZMQ_XREP; - // On connect, pipes are created only after initial handshaking. - // That way we are aware of the peer's identity when binding to the pipes. - options.immediate_connect = false; + // Start the peer ID sequence from a random point. + generate_random (&next_peer_id, sizeof (next_peer_id)); } zmq::xrep_t::~xrep_t () @@ -47,16 +48,33 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); + // Generate a new peer ID. Take care to avoid duplicates. + outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); + if (!outpipes.empty ()) { + while (true) { + if (it == outpipes.end ()) + it = outpipes.begin (); + if (it->first != next_peer_id) + break; + ++next_peer_id; + ++it; + } + } + // Add the pipe to the map out outbound pipes. - // TODO: What if new connection has same peer identity as the old one? outpipe_t outpipe = {pipe_, true}; bool ok = outpipes.insert (outpipes_t::value_type ( - peer_identity_, outpipe)).second; + next_peer_id, outpipe)).second; zmq_assert (ok); // Add the pipe to the list of inbound pipes. - inpipe_t inpipe = {pipe_, peer_identity_, true}; + inpipe_t inpipe = {pipe_, next_peer_id, true}; inpipes.push_back (inpipe); + + // Advance next peer ID so that if new connection is dropped shortly after + // its creation we don't accidentally get two subsequent peers with + // the same ID. + ++next_peer_id; } void zmq::xrep_t::xterminated (pipe_t *pipe_) @@ -115,7 +133,7 @@ void zmq::xrep_t::xwrite_activated (pipe_t *pipe_) int zmq::xrep_t::xsend (msg_t *msg_, int flags_) { - // If this is the first part of the message it's the identity of the + // If this is the first part of the message it's the ID of the // peer to send the message to. if (!more_out) { zmq_assert (!current_out); @@ -127,10 +145,11 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) more_out = true; - // Find the pipe associated with the identity stored in the prefix. + // Find the pipe associated with the peer ID stored in the prefix. // If there's no such pipe just silently ignore the message. - blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); - outpipes_t::iterator it = outpipes.find (identity); + zmq_assert (msg_->size () == 4); + uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); + outpipes_t::iterator it = outpipes.find (peer_id); if (it != outpipes.end ()) { current_out = it->second.pipe; @@ -220,10 +239,10 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // If we have a message, create a prefix and return it to the caller. if (prefetched) { - int rc = msg_->init_size (inpipes [current_in].identity.size ()); + int rc = msg_->init_size (4); errno_assert (rc == 0); - memcpy (msg_->data (), inpipes [current_in].identity.data (), - msg_->size ()); + put_uint32 ((unsigned char*) msg_->data (), + inpipes [current_in].peer_id); msg_->set_flags (msg_t::label); return 0; } diff --git a/src/xrep.hpp b/src/xrep.hpp index fbc7385..d5014e0 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -25,7 +25,7 @@ #include <vector> #include "socket_base.hpp" -#include "blob.hpp" +#include "stdint.hpp" #include "msg.hpp" namespace zmq @@ -41,7 +41,8 @@ namespace zmq ~xrep_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_, + const blob_t &peer_identity_); int xsend (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); @@ -60,7 +61,7 @@ namespace zmq struct inpipe_t { class pipe_t *pipe; - blob_t identity; + uint32_t peer_id; bool active; }; @@ -86,8 +87,8 @@ namespace zmq bool active; }; - // Outbound pipes indexed by the peer names. - typedef std::map <blob_t, outpipe_t> outpipes_t; + // Outbound pipes indexed by the peer IDs. + typedef std::map <uint32_t, outpipe_t> outpipes_t; outpipes_t outpipes; // The pipe we are currently writing to. @@ -96,6 +97,10 @@ namespace zmq // If true, more outgoing message parts are expected. bool more_out; + // Peer ID are generated. It's a simple increment and wrap-over + // algorithm. This value is the next ID to use (if not used already). + uint32_t next_peer_id; + xrep_t (const xrep_t&); const xrep_t &operator = (const xrep_t&); }; diff --git a/tests/Makefile.am b/tests/Makefile.am index 785b7c5..9238850 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -5,7 +5,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_pair_tcp \ test_reqrep_inproc \ test_reqrep_tcp \ - test_hwm + test_hwm \ + test_reqrep_device if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -22,6 +23,8 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp test_hwm_SOURCES = test_hwm.cpp +test_reqrep_device_SOURCES = test_reqrep_device.cpp + if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_reqrep_device.cpp b/tests/test_reqrep_device.cpp new file mode 100644 index 0000000..f6f06c9 --- /dev/null +++ b/tests/test_reqrep_device.cpp @@ -0,0 +1,160 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <assert.h> +#include <string.h> + +#include "../include/zmq.h" + +int main (int argc, char *argv []) +{ + void *ctx = zmq_init (1); + assert (ctx); + + // Create a req/rep device. + void *xreq = zmq_socket (ctx, ZMQ_XREQ); + assert (xreq); + int rc = zmq_bind (xreq, "tcp://127.0.0.1:5560"); + assert (rc == 0); + void *xrep = zmq_socket (ctx, ZMQ_XREP); + assert (xrep); + rc = zmq_bind (xrep, "tcp://127.0.0.1:5561"); + assert (rc == 0); + + // Create a worker. + void *rep = zmq_socket (ctx, ZMQ_REP); + assert (rep); + rc = zmq_connect (rep, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + // Create a client. + void *req = zmq_socket (ctx, ZMQ_REQ); + assert (req); + rc = zmq_connect (req, "tcp://127.0.0.1:5561"); + assert (rc == 0); + + // Send a request. + rc = zmq_send (req, "ABC", 3, ZMQ_SNDMORE); + assert (rc == 3); + rc = zmq_send (req, "DEF", 3, 0); + assert (rc == 3); + + // Pass the request through the device. + for (int i = 0; i != 4; i++) { + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + rc = zmq_recvmsg (xrep, &msg, 0); + assert (rc >= 0); + int rcvlabel; + size_t sz = sizeof (rcvlabel); + rc = zmq_getsockopt (xrep, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + int rcvmore; + rc = zmq_getsockopt (xrep, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + rc = zmq_sendmsg (xreq, &msg, + (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0)); + assert (rc >= 0); + } + + // Receive the request. + char buff [3]; + rc = zmq_recv (rep, buff, 3, 0); + assert (rc == 3); + assert (memcmp (buff, "ABC", 3) == 0); + int rcvlabel; + size_t sz = sizeof (rcvlabel); + rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + assert (!rcvlabel); + int rcvmore; + rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (rcvmore); + rc = zmq_recv (rep, buff, 3, 0); + assert (rc == 3); + assert (memcmp (buff, "DEF", 3) == 0); + rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + assert (!rcvlabel); + rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (!rcvmore); + + // Send the reply. + rc = zmq_send (rep, "GHI", 3, ZMQ_SNDMORE); + assert (rc == 3); + rc = zmq_send (rep, "JKL", 3, 0); + assert (rc == 3); + + // Pass the reply through the device. + for (int i = 0; i != 4; i++) { + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + rc = zmq_recvmsg (xreq, &msg, 0); + assert (rc >= 0); + int rcvlabel; + size_t sz = sizeof (rcvlabel); + rc = zmq_getsockopt (xreq, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + int rcvmore; + rc = zmq_getsockopt (xreq, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + rc = zmq_sendmsg (xrep, &msg, + (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0)); + assert (rc >= 0); + } + + // Receive the reply. + rc = zmq_recv (req, buff, 3, 0); + assert (rc == 3); + assert (memcmp (buff, "GHI", 3) == 0); + rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + assert (!rcvlabel); + rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (rcvmore); + rc = zmq_recv (req, buff, 3, 0); + assert (rc == 3); + assert (memcmp (buff, "JKL", 3) == 0); + rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + assert (!rcvlabel); + rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (!rcvmore); + + // Clean up. + rc = zmq_close (req); + assert (rc == 0); + rc = zmq_close (rep); + assert (rc == 0); + rc = zmq_close (xrep); + assert (rc == 0); + rc = zmq_close (xreq); + assert (rc == 0); + rc = zmq_term (ctx); + assert (rc == 0); + + return 0 ; +} |