From ec81f8fb2523e1e2fe45eaadc05311a35bf551d7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 22 Jun 2011 11:02:16 +0200 Subject: New wire format for REQ/REP pattern This patch introduces two changes: 1. 32-bit ID is used to identify the peer instead of UUID 2. REQ socket seeds the label stack with unique 32-bit request ID It also drops any replies with non-matching request ID Signed-off-by: Martin Sustrik --- tests/Makefile.am | 5 +- tests/test_reqrep_device.cpp | 160 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 tests/test_reqrep_device.cpp (limited to 'tests') diff --git a/tests/Makefile.am b/tests/Makefile.am index 785b7c5..9238850 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -5,7 +5,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_pair_tcp \ test_reqrep_inproc \ test_reqrep_tcp \ - test_hwm + test_hwm \ + test_reqrep_device if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -22,6 +23,8 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp test_hwm_SOURCES = test_hwm.cpp +test_reqrep_device_SOURCES = test_reqrep_device.cpp + if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_reqrep_device.cpp b/tests/test_reqrep_device.cpp new file mode 100644 index 0000000..f6f06c9 --- /dev/null +++ b/tests/test_reqrep_device.cpp @@ -0,0 +1,160 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include + +#include "../include/zmq.h" + +int main (int argc, char *argv []) +{ + void *ctx = zmq_init (1); + assert (ctx); + + // Create a req/rep device. + void *xreq = zmq_socket (ctx, ZMQ_XREQ); + assert (xreq); + int rc = zmq_bind (xreq, "tcp://127.0.0.1:5560"); + assert (rc == 0); + void *xrep = zmq_socket (ctx, ZMQ_XREP); + assert (xrep); + rc = zmq_bind (xrep, "tcp://127.0.0.1:5561"); + assert (rc == 0); + + // Create a worker. + void *rep = zmq_socket (ctx, ZMQ_REP); + assert (rep); + rc = zmq_connect (rep, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + // Create a client. + void *req = zmq_socket (ctx, ZMQ_REQ); + assert (req); + rc = zmq_connect (req, "tcp://127.0.0.1:5561"); + assert (rc == 0); + + // Send a request. + rc = zmq_send (req, "ABC", 3, ZMQ_SNDMORE); + assert (rc == 3); + rc = zmq_send (req, "DEF", 3, 0); + assert (rc == 3); + + // Pass the request through the device. + for (int i = 0; i != 4; i++) { + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + rc = zmq_recvmsg (xrep, &msg, 0); + assert (rc >= 0); + int rcvlabel; + size_t sz = sizeof (rcvlabel); + rc = zmq_getsockopt (xrep, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + int rcvmore; + rc = zmq_getsockopt (xrep, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + rc = zmq_sendmsg (xreq, &msg, + (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0)); + assert (rc >= 0); + } + + // Receive the request. + char buff [3]; + rc = zmq_recv (rep, buff, 3, 0); + assert (rc == 3); + assert (memcmp (buff, "ABC", 3) == 0); + int rcvlabel; + size_t sz = sizeof (rcvlabel); + rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + assert (!rcvlabel); + int rcvmore; + rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (rcvmore); + rc = zmq_recv (rep, buff, 3, 0); + assert (rc == 3); + assert (memcmp (buff, "DEF", 3) == 0); + rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + assert (!rcvlabel); + rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (!rcvmore); + + // Send the reply. + rc = zmq_send (rep, "GHI", 3, ZMQ_SNDMORE); + assert (rc == 3); + rc = zmq_send (rep, "JKL", 3, 0); + assert (rc == 3); + + // Pass the reply through the device. + for (int i = 0; i != 4; i++) { + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + rc = zmq_recvmsg (xreq, &msg, 0); + assert (rc >= 0); + int rcvlabel; + size_t sz = sizeof (rcvlabel); + rc = zmq_getsockopt (xreq, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + int rcvmore; + rc = zmq_getsockopt (xreq, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + rc = zmq_sendmsg (xrep, &msg, + (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0)); + assert (rc >= 0); + } + + // Receive the reply. + rc = zmq_recv (req, buff, 3, 0); + assert (rc == 3); + assert (memcmp (buff, "GHI", 3) == 0); + rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + assert (!rcvlabel); + rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (rcvmore); + rc = zmq_recv (req, buff, 3, 0); + assert (rc == 3); + assert (memcmp (buff, "JKL", 3) == 0); + rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz); + assert (rc == 0); + assert (!rcvlabel); + rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (!rcvmore); + + // Clean up. + rc = zmq_close (req); + assert (rc == 0); + rc = zmq_close (rep); + assert (rc == 0); + rc = zmq_close (xrep); + assert (rc == 0); + rc = zmq_close (xreq); + assert (rc == 0); + rc = zmq_term (ctx); + assert (rc == 0); + + return 0 ; +} -- cgit v1.2.3