From 36fd87810274329c8cd86344b95a0521541e7bab Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 21 Apr 2012 07:07:57 +0200 Subject: xs_shutdown implemented This patch allows for partial shutdown of the socket. Signed-off-by: Martin Sustrik --- tests/Makefile.am | 4 +- tests/hwm.cpp | 4 +- tests/invalid_rep.cpp | 4 +- tests/libzmq21.cpp | 4 +- tests/linger.cpp | 2 +- tests/msg_flags.cpp | 4 +- tests/pair_inproc.cpp | 4 +- tests/pair_ipc.cpp | 4 +- tests/pair_tcp.cpp | 4 +- tests/polltimeo.cpp | 4 +- tests/reconnect.cpp | 8 ++-- tests/reqrep_device.cpp | 8 ++-- tests/reqrep_inproc.cpp | 4 +- tests/reqrep_ipc.cpp | 4 +- tests/reqrep_tcp.cpp | 4 +- tests/resubscribe.cpp | 6 +-- tests/shutdown.cpp | 103 ++++++++++++++++++++++++++++++++++++++++++++++ tests/shutdown_stress.cpp | 4 +- tests/sub_forward.cpp | 8 ++-- tests/survey.cpp | 10 ++--- tests/tests.cpp | 6 +++ tests/timeo.cpp | 6 +-- tests/wireformat.cpp | 4 +- 23 files changed, 162 insertions(+), 51 deletions(-) create mode 100644 tests/shutdown.cpp (limited to 'tests') diff --git a/tests/Makefile.am b/tests/Makefile.am index ba76260..47a880b 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -24,7 +24,8 @@ noinst_PROGRAMS = pair_inproc \ wireformat \ libzmq21 \ resubscribe \ - survey + survey \ + shutdown pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp @@ -48,5 +49,6 @@ wireformat_SOURCES = wireformat.cpp libzmq21_SOURCES = libzmq21.cpp resubscribe_SOURCES = resubscribe.cpp survey_SOURCES = survey.cpp +shutdown_SOURCES = shutdown.cpp TESTS = $(noinst_PROGRAMS) diff --git a/tests/hwm.cpp b/tests/hwm.cpp index 6cd9a41..93e8910 100644 --- a/tests/hwm.cpp +++ b/tests/hwm.cpp @@ -35,14 +35,14 @@ int XS_TEST_MAIN () int rc = xs_setsockopt (sb, XS_RCVHWM, &hwm, sizeof (hwm)); assert (rc == 0); rc = xs_bind (sb, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_PUSH); assert (sc); rc = xs_setsockopt (sc, XS_SNDHWM, &hwm, sizeof (hwm)); assert (rc == 0); rc = xs_connect (sc, "inproc://a"); - assert (rc == 0); + assert (rc != -1); // Try to send 10 messages. Only 4 should succeed. for (int i = 0; i < 10; i++) diff --git a/tests/invalid_rep.cpp b/tests/invalid_rep.cpp index cd0ad8e..93118b6 100644 --- a/tests/invalid_rep.cpp +++ b/tests/invalid_rep.cpp @@ -38,9 +38,9 @@ int XS_TEST_MAIN () rc = xs_setsockopt (req_socket, XS_LINGER, &linger, sizeof (int)); assert (rc == 0); rc = xs_bind (xrep_socket, "inproc://hi"); - assert (rc == 0); + assert (rc != -1); rc = xs_connect (req_socket, "inproc://hi"); - assert (rc == 0); + assert (rc != -1); // Initial request. rc = xs_send (req_socket, "r", 1, 0); diff --git a/tests/libzmq21.cpp b/tests/libzmq21.cpp index e7affae..ecf271b 100644 --- a/tests/libzmq21.cpp +++ b/tests/libzmq21.cpp @@ -57,7 +57,7 @@ int XS_TEST_MAIN () int rc = xs_setsockopt (pub, XS_PROTOCOL, &protocol, sizeof (protocol)); assert (rc == 0); rc = xs_bind (pub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); int oldsub = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); struct sockaddr_in address; @@ -105,7 +105,7 @@ int XS_TEST_MAIN () rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0); assert (rc == 0); rc = xs_bind (sub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); int oldpub = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); address.sin_family = AF_INET; diff --git a/tests/linger.cpp b/tests/linger.cpp index fec0f72..1b1d341 100644 --- a/tests/linger.cpp +++ b/tests/linger.cpp @@ -37,7 +37,7 @@ int XS_TEST_MAIN () // Connect to non-existent endpoing. assert (rc == 0); rc = xs_connect (s, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // Send a message. rc = xs_send (s, "r", 1, 0); diff --git a/tests/msg_flags.cpp b/tests/msg_flags.cpp index eade40f..3f3b3b1 100644 --- a/tests/msg_flags.cpp +++ b/tests/msg_flags.cpp @@ -30,11 +30,11 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_XREP); assert (sb); int rc = xs_bind (sb, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_XREQ); assert (sc); rc = xs_connect (sc, "inproc://a"); - assert (rc == 0); + assert (rc != -1); // Send 2-part message. rc = xs_send (sc, "A", 1, XS_SNDMORE); diff --git a/tests/pair_inproc.cpp b/tests/pair_inproc.cpp index c02d08b..456a371 100644 --- a/tests/pair_inproc.cpp +++ b/tests/pair_inproc.cpp @@ -30,12 +30,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_PAIR); assert (sb); int rc = xs_bind (sb, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_PAIR); assert (sc); rc = xs_connect (sc, "inproc://a"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/pair_ipc.cpp b/tests/pair_ipc.cpp index 43915cc..ef39bfc 100644 --- a/tests/pair_ipc.cpp +++ b/tests/pair_ipc.cpp @@ -37,12 +37,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_PAIR); assert (sb); int rc = xs_bind (sb, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_PAIR); assert (sc); rc = xs_connect (sc, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/pair_tcp.cpp b/tests/pair_tcp.cpp index a0decb9..1df90ad 100644 --- a/tests/pair_tcp.cpp +++ b/tests/pair_tcp.cpp @@ -31,12 +31,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_PAIR); assert (sb); int rc = xs_bind (sb, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_PAIR); assert (sc); rc = xs_connect (sc, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/polltimeo.cpp b/tests/polltimeo.cpp index ca88393..f425593 100644 --- a/tests/polltimeo.cpp +++ b/tests/polltimeo.cpp @@ -30,7 +30,7 @@ extern "C" void *sc = xs_socket (ctx_, XS_PUSH); assert (sc); int rc = xs_connect (sc, "inproc://timeout_test"); - assert (rc == 0); + assert (rc != -1); sleep (1); rc = xs_close (sc); assert (rc == 0); @@ -48,7 +48,7 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_PULL); assert (sb); int rc = xs_bind (sb, "inproc://timeout_test"); - assert (rc == 0); + assert (rc != -1); // Check whether timeout is honoured. xs_pollitem_t pi; diff --git a/tests/reconnect.cpp b/tests/reconnect.cpp index c677527..ebbaf32 100644 --- a/tests/reconnect.cpp +++ b/tests/reconnect.cpp @@ -34,7 +34,7 @@ int XS_TEST_MAIN () // Connect before bind was done at the peer and send one message. int rc = xs_connect (push, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); rc = xs_send (push, "ABC", 3, 0); assert (rc == 3); @@ -43,7 +43,7 @@ int XS_TEST_MAIN () // Bind the peer and get the message. rc = xs_bind (pull, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); unsigned char buf [3]; rc = xs_recv (pull, buf, sizeof (buf), 0); assert (rc == 3); @@ -64,7 +64,7 @@ int XS_TEST_MAIN () // Connect before bind was done at the peer and send one message. rc = xs_connect (push, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); rc = xs_send (push, "ABC", 3, 0); assert (rc == 3); @@ -73,7 +73,7 @@ int XS_TEST_MAIN () // Bind the peer and get the message. rc = xs_bind (pull, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); rc = xs_recv (pull, buf, sizeof (buf), 0); assert (rc == 3); diff --git a/tests/reqrep_device.cpp b/tests/reqrep_device.cpp index 81960d9..732f4dc 100644 --- a/tests/reqrep_device.cpp +++ b/tests/reqrep_device.cpp @@ -32,23 +32,23 @@ int XS_TEST_MAIN () void *xreq = xs_socket (ctx, XS_XREQ); assert (xreq); int rc = xs_bind (xreq, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); void *xrep = xs_socket (ctx, XS_XREP); assert (xrep); rc = xs_bind (xrep, "tcp://127.0.0.1:5561"); - assert (rc == 0); + assert (rc != -1); // Create a worker. void *rep = xs_socket (ctx, XS_REP); assert (rep); rc = xs_connect (rep, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // Create a client. void *req = xs_socket (ctx, XS_REQ); assert (req); rc = xs_connect (req, "tcp://127.0.0.1:5561"); - assert (rc == 0); + assert (rc != -1); // Send a request. rc = xs_send (req, "ABC", 3, XS_SNDMORE); diff --git a/tests/reqrep_inproc.cpp b/tests/reqrep_inproc.cpp index c66b82b..bd5cc92 100644 --- a/tests/reqrep_inproc.cpp +++ b/tests/reqrep_inproc.cpp @@ -30,12 +30,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_REP); assert (sb); int rc = xs_bind (sb, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_REQ); assert (sc); rc = xs_connect (sc, "inproc://a"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/reqrep_ipc.cpp b/tests/reqrep_ipc.cpp index 9be50fb..66315ef 100644 --- a/tests/reqrep_ipc.cpp +++ b/tests/reqrep_ipc.cpp @@ -37,12 +37,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_REP); assert (sb); int rc = xs_bind (sb, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_REQ); assert (sc); rc = xs_connect (sc, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/reqrep_tcp.cpp b/tests/reqrep_tcp.cpp index b11d5a1..f68d762 100644 --- a/tests/reqrep_tcp.cpp +++ b/tests/reqrep_tcp.cpp @@ -31,12 +31,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_REP); assert (sb); int rc = xs_bind (sb, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_REQ); assert (sc); rc = xs_connect (sc, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/resubscribe.cpp b/tests/resubscribe.cpp index 1924434..5d78712 100644 --- a/tests/resubscribe.cpp +++ b/tests/resubscribe.cpp @@ -34,13 +34,13 @@ int XS_TEST_MAIN () // Send two subscriptions upstream. int rc = xs_bind (xpub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); rc = xs_setsockopt (sub, XS_SUBSCRIBE, "a", 1); assert (rc == 0); rc = xs_setsockopt (sub, XS_SUBSCRIBE, "b", 1); assert (rc == 0); rc = xs_connect (sub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // Check whether subscriptions are correctly received. char buf [5]; @@ -68,7 +68,7 @@ int XS_TEST_MAIN () xpub = xs_socket (ctx, XS_XPUB); assert (xpub); rc = xs_bind (xpub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // We have to give control to the SUB socket here so that it has // chance to resend the subscriptions. diff --git a/tests/shutdown.cpp b/tests/shutdown.cpp new file mode 100644 index 0000000..9f055a1 --- /dev/null +++ b/tests/shutdown.cpp @@ -0,0 +1,103 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +int XS_TEST_MAIN () +{ + int rc; + char buf [32]; + + fprintf (stderr, "shutdown test running...\n"); + + // Create infrastructure. + void *ctx = xs_init (); + assert (ctx); + void *push = xs_socket (ctx, XS_PUSH); + assert (push); + int push_id = xs_bind (push, "tcp://127.0.0.1:5560"); + assert (push_id != -1); + void *pull = xs_socket (ctx, XS_PULL); + assert (pull); + rc = xs_connect (pull, "tcp://127.0.0.1:5560"); + assert (rc != -1); + + // Pass one message through to ensure the connection is established. + rc = xs_send (push, "ABC", 3, 0); + assert (rc == 3); + rc = xs_recv (pull, buf, sizeof (buf), 0); + assert (rc == 3); + + // Shut down the bound endpoint. + rc = xs_shutdown (push, push_id); + assert (rc == 0); + sleep (1); + + // Check that sending would block (there's no outbound connection). + rc = xs_send (push, "ABC", 3, XS_DONTWAIT); + assert (rc == -1 && xs_errno () == EAGAIN); + + // Clean up. + rc = xs_close (pull); + assert (rc == 0); + rc = xs_close (push); + assert (rc == 0); + rc = xs_term (ctx); + assert (rc == 0); + + // Now the other way round. + + // Create infrastructure. + ctx = xs_init (); + assert (ctx); + pull = xs_socket (ctx, XS_PULL); + assert (pull); + rc = xs_bind (pull, "tcp://127.0.0.1:5560"); + assert (rc != -1); + push = xs_socket (ctx, XS_PUSH); + assert (push); + push_id = xs_connect (push, "tcp://127.0.0.1:5560"); + assert (push_id != -1); + + // Pass one message through to ensure the connection is established. + rc = xs_send (push, "ABC", 3, 0); + assert (rc == 3); + rc = xs_recv (pull, buf, sizeof (buf), 0); + assert (rc == 3); + + // Shut down the bound endpoint. + rc = xs_shutdown (push, push_id); + assert (rc == 0); + sleep (1); + + // Check that sending would block (there's no outbound connection). + rc = xs_send (push, "ABC", 3, XS_DONTWAIT); + assert (rc == -1 && xs_errno () == EAGAIN); + + // Clean up. + rc = xs_close (pull); + assert (rc == 0); + rc = xs_close (push); + assert (rc == 0); + rc = xs_term (ctx); + assert (rc == 0); + + return 0; +} diff --git a/tests/shutdown_stress.cpp b/tests/shutdown_stress.cpp index f106cc0..2b598a9 100644 --- a/tests/shutdown_stress.cpp +++ b/tests/shutdown_stress.cpp @@ -30,7 +30,7 @@ extern "C" int rc; rc = xs_connect (s_, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // Start closing the socket while the connecting process is underway. rc = xs_close (s_); @@ -65,7 +65,7 @@ int XS_TEST_MAIN () assert (s1); rc = xs_bind (s1, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); for (i = 0; i != THREAD_COUNT; i++) { s2 = xs_socket (ctx, XS_SUB); diff --git a/tests/sub_forward.cpp b/tests/sub_forward.cpp index 6d385de..0237352 100644 --- a/tests/sub_forward.cpp +++ b/tests/sub_forward.cpp @@ -32,23 +32,23 @@ int XS_TEST_MAIN () void *xpub = xs_socket (ctx, XS_XPUB); assert (xpub); int rc = xs_bind (xpub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); void *xsub = xs_socket (ctx, XS_XSUB); assert (xsub); rc = xs_bind (xsub, "tcp://127.0.0.1:5561"); - assert (rc == 0); + assert (rc != -1); // Create a publisher. void *pub = xs_socket (ctx, XS_PUB); assert (pub); rc = xs_connect (pub, "tcp://127.0.0.1:5561"); - assert (rc == 0); + assert (rc != -1); // Create a subscriber. void *sub = xs_socket (ctx, XS_SUB); assert (sub); rc = xs_connect (sub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // Subscribe for all messages. rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0); diff --git a/tests/survey.cpp b/tests/survey.cpp index 9347e2d..f21b217 100644 --- a/tests/survey.cpp +++ b/tests/survey.cpp @@ -33,23 +33,23 @@ int XS_TEST_MAIN () void *xsurveyor = xs_socket (ctx, XS_XSURVEYOR); assert (xsurveyor); rc = xs_bind (xsurveyor, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *xrespondent = xs_socket (ctx, XS_XRESPONDENT); assert (xrespondent); rc = xs_bind (xrespondent, "inproc://b"); - assert (rc == 0); + assert (rc != -1); void *surveyor = xs_socket (ctx, XS_SURVEYOR); assert (surveyor); rc = xs_connect (surveyor, "inproc://b"); - assert (rc == 0); + assert (rc != -1); void *respondent1 = xs_socket (ctx, XS_RESPONDENT); assert (respondent1); rc = xs_connect (respondent1, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *respondent2 = xs_socket (ctx, XS_RESPONDENT); assert (respondent2); rc = xs_connect (respondent2, "inproc://a"); - assert (rc == 0); + assert (rc != -1); // Send the survey. rc = xs_send (surveyor, "ABC", 3, 0); diff --git a/tests/tests.cpp b/tests/tests.cpp index 3d7951c..da034e6 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -115,6 +115,10 @@ #include "survey.cpp" #undef XS_TEST_MAIN +#define XS_TEST_MAIN shutdown +#include "shutdown.cpp" +#undef XS_TEST_MAIN + int main () { int rc; @@ -161,6 +165,8 @@ int main () assert (rc == 0); rc = survey (); assert (rc == 0); + rc = shutdown () + assert (rc == 0); fprintf (stderr, "SUCCESS\n"); sleep (1); diff --git a/tests/timeo.cpp b/tests/timeo.cpp index bc73eec..fb17c85 100644 --- a/tests/timeo.cpp +++ b/tests/timeo.cpp @@ -30,7 +30,7 @@ extern "C" void *sc = xs_socket (ctx_, XS_PUSH); assert (sc); int rc = xs_connect (sc, "inproc://timeout_test"); - assert (rc == 0); + assert (rc != -1); sleep (1); rc = xs_close (sc); assert (rc == 0); @@ -48,7 +48,7 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_PULL); assert (sb); int rc = xs_bind (sb, "inproc://timeout_test"); - assert (rc == 0); + assert (rc != -1); // Check whether non-blocking recv returns immediately. char buf [] = "12345678ABCDEFGH12345678abcdefgh"; @@ -90,7 +90,7 @@ int XS_TEST_MAIN () rc = xs_setsockopt(sb, XS_SNDTIMEO, &timeout, timeout_size); assert (rc == 0); rc = xs_connect (sc, "inproc://timeout_test"); - assert (rc == 0); + assert (rc != -1); rc = xs_send (sc, buf, 32, 0); assert (rc == 32); rc = xs_recv (sb, buf, 32, 0); diff --git a/tests/wireformat.cpp b/tests/wireformat.cpp index 1177798..f3e0f96 100644 --- a/tests/wireformat.cpp +++ b/tests/wireformat.cpp @@ -50,9 +50,9 @@ int XS_TEST_MAIN () // Bind the peer and get the message. int rc = xs_bind (pull, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); rc = xs_bind (push, "tcp://127.0.0.1:5561"); - assert (rc == 0); + assert (rc != -1); // Connect to the peer using raw sockets. int rpush = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); -- cgit v1.2.3