diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2012-04-21 07:07:57 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2012-04-21 07:07:57 +0200 |
commit | 36fd87810274329c8cd86344b95a0521541e7bab (patch) | |
tree | ec183c7dd3a9b1de3361e7211cbffc960d139cf1 | |
parent | d26e86aa0afc3ec2534eacb4131aee8f6805c36a (diff) |
xs_shutdown implemented
This patch allows for partial shutdown of the socket.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
47 files changed, 355 insertions, 98 deletions
@@ -50,6 +50,7 @@ tests/wireformat tests/libzmq21 tests/resubscribe tests/survey +tests/shutdown src/platform.hpp* src/stamp-h1 perf/*.exe diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj index d1258e4..20d2554 100644 --- a/builds/msvc/tests/tests.vcxproj +++ b/builds/msvc/tests/tests.vcxproj @@ -155,6 +155,10 @@ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> </ClCompile> + <ClCompile Include="..\..\..\tests\shutdown.cpp"> + <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> + <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> + </ClCompile> <ClCompile Include="..\..\..\tests\tests.cpp" /> <ClCompile Include="..\..\..\tests\timeo.cpp"> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> @@ -188,4 +192,4 @@ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> <ImportGroup Label="ExtensionTargets"> </ImportGroup> -</Project>
\ No newline at end of file +</Project> diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters index 0fb41a8..ea97fed 100644 --- a/builds/msvc/tests/tests.vcxproj.filters +++ b/builds/msvc/tests/tests.vcxproj.filters @@ -68,6 +68,9 @@ <ClCompile Include="..\..\..\tests\survey.cpp"> <Filter>Header Files</Filter> </ClCompile> + <ClCompile Include="..\..\..\tests\shutdown.cpp"> + <Filter>Header Files</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <Filter Include="Header Files"> @@ -79,4 +82,4 @@ <Filter>Header Files</Filter> </ClInclude> </ItemGroup> -</Project>
\ No newline at end of file +</Project> diff --git a/doc/Makefile.am b/doc/Makefile.am index 66dd87b..9f90feb 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -3,7 +3,7 @@ MAN3 = xs_bind.3 xs_close.3 xs_connect.3 xs_init.3 \ xs_msg_init_data.3 xs_msg_init_size.3 xs_msg_move.3 xs_msg_size.3 \ xs_poll.3 xs_recv.3 xs_send.3 xs_setsockopt.3 xs_socket.3 \ xs_strerror.3 xs_term.3 xs_version.3 xs_getsockopt.3 xs_errno.3 \ - xs_sendmsg.3 xs_recvmsg.3 xs_getmsgopt.3 xs_setctxopt.3 + xs_sendmsg.3 xs_recvmsg.3 xs_getmsgopt.3 xs_setctxopt.3 xs_shutdown.3 MAN7 = xs.7 xs_tcp.7 xs_pgm.7 xs_inproc.7 xs_ipc.7 xs_zmq.7 MAN_DOC = $(MAN1) $(MAN3) $(MAN7) diff --git a/doc/xs_bind.txt b/doc/xs_bind.txt index fc08124..3472f04 100644 --- a/doc/xs_bind.txt +++ b/doc/xs_bind.txt @@ -38,8 +38,8 @@ semantics involved when connecting or binding a socket to multiple endpoints. RETURN VALUE ------------ -The _xs_bind()_ function shall return zero if successful. Otherwise it shall -return `-1` and set 'errno' to one of the values defined below. +The _xs_bind()_ function shall return endpoint ID if successful. Otherwise it +shall return `-1` and set 'errno' to one of the values defined below. ERRORS @@ -71,10 +71,10 @@ void *socket = xs_socket (context, XS_PUB); assert (socket); /* Bind it to a in-process transport with the address 'my_publisher' */ int rc = xs_bind (socket, "inproc://my_publisher"); -assert (rc == 0); +assert (rc != -1); /* Bind it to a TCP transport on port 5555 of the 'eth0' interface */ rc = xs_bind (socket, "tcp://eth0:5555"); -assert (rc == 0); +assert (rc != -1); ---- diff --git a/doc/xs_connect.txt b/doc/xs_connect.txt index 898b915..02597da 100644 --- a/doc/xs_connect.txt +++ b/doc/xs_connect.txt @@ -42,8 +42,8 @@ physical connection was or can actually be established. RETURN VALUE ------------ -The _xs_connect()_ function shall return zero if successful. Otherwise it -shall return `-1` and set 'errno' to one of the values defined below. +The _xs_connect()_ function shall return endpoint ID if successful. Otherwise +it shall return `-1` and set 'errno' to one of the values defined below. ERRORS @@ -69,10 +69,10 @@ void *socket = xs_socket (context, XS_SUB); assert (socket); /* Connect it to an in-process transport with the address 'my_publisher' */ int rc = xs_connect (socket, "inproc://my_publisher"); -assert (rc == 0); +assert (rc != -1); /* Connect it to the host server001, port 5555 using a TCP transport */ rc = xs_connect (socket, "tcp://server001:5555"); -assert (rc == 0); +assert (rc != -1); ---- diff --git a/doc/xs_inproc.txt b/doc/xs_inproc.txt index 9ec255f..f0dae94 100644 --- a/doc/xs_inproc.txt +++ b/doc/xs_inproc.txt @@ -50,20 +50,20 @@ EXAMPLES ---- /* Assign the in-process name "#1" */ rc = xs_bind(socket, "inproc://#1"); -assert (rc == 0); +assert (rc != -1); /* Assign the in-process name "my-endpoint" */ rc = xs_bind(socket, "inproc://my-endpoint"); -assert (rc == 0); +assert (rc != -1); ---- .Connecting a socket ---- /* Connect to the in-process name "#1" */ rc = xs_connect(socket, "inproc://#1"); -assert (rc == 0); +assert (rc != -1); /* Connect to the in-process name "my-endpoint" */ rc = xs_connect(socket, "inproc://my-endpoint"); -assert (rc == 0); +assert (rc != -1); ---- diff --git a/doc/xs_ipc.txt b/doc/xs_ipc.txt index f3c5147..b917c78 100644 --- a/doc/xs_ipc.txt +++ b/doc/xs_ipc.txt @@ -54,14 +54,14 @@ EXAMPLES ---- /* Assign the pathname "/tmp/feeds/0" */ rc = xs_bind(socket, "ipc:///tmp/feeds/0"); -assert (rc == 0); +assert (rc != -1); ---- .Connecting a socket ---- /* Connect to the pathname "/tmp/feeds/0" */ rc = xs_connect(socket, "ipc:///tmp/feeds/0"); -assert (rc == 0); +assert (rc != -1); ---- SEE ALSO diff --git a/doc/xs_pgm.txt b/doc/xs_pgm.txt index fcce19a..2fc6ca8 100644 --- a/doc/xs_pgm.txt +++ b/doc/xs_pgm.txt @@ -137,12 +137,12 @@ EXAMPLE /* using the first Ethernet network interface on Linux */ /* and the Encapsulated PGM protocol */ rc = xs_connect(socket, "epgm://eth0;239.192.1.1:5555"); -assert (rc == 0); +assert (rc != -1); /* Connecting to the multicast address 239.192.1.1, port 5555, */ /* using the network interface with the address 192.168.1.1 */ /* and the standard PGM protocol */ rc = xs_connect(socket, "pgm://192.168.1.1;239.192.1.1:5555"); -assert (rc == 0); +assert (rc != -1); ---- diff --git a/doc/xs_setsockopt.txt b/doc/xs_setsockopt.txt index e07d9dd..d636c83 100644 --- a/doc/xs_setsockopt.txt +++ b/doc/xs_setsockopt.txt @@ -397,15 +397,15 @@ int64_t affinity; /* Incoming connections on TCP port 5555 shall be handled by I/O thread 1 */ affinity = 1; rc = xs_setsockopt (socket, XS_AFFINITY, &affinity, sizeof affinity); -assert (rc); +assert (rc == 0); rc = xs_bind (socket, "tcp://lo:5555"); -assert (rc); +assert (rc != -1); /* Incoming connections on TCP port 5556 shall be handled by I/O thread 2 */ affinity = 2; rc = xs_setsockopt (socket, XS_AFFINITY, &affinity, sizeof affinity); -assert (rc); +assert (rc == 0); rc = xs_bind (socket, "tcp://lo:5556"); -assert (rc); +assert (rc != -1); ---- diff --git a/doc/xs_shutdown.txt b/doc/xs_shutdown.txt new file mode 100644 index 0000000..8943933 --- /dev/null +++ b/doc/xs_shutdown.txt @@ -0,0 +1,69 @@ +xs_bind(3) +========== + + +NAME +---- +xs_shutdown - shut down part of the socket + + +SYNOPSIS +-------- +*int xs_shutdown (void '*socket', int 'how');* + + +DESCRIPTION +----------- +This function partially closes the socket. It disconnects or unbinds an endpoint +previously connected or bound by _xs_bind()_ or _xs_connect()_. 'how' parameter +is the endpoint ID as returned by _xs_bind()_ or _xs_connect()_. + +Endpoint shutdown honours 'linger' socket option. I.e. if there are any pending +outbound messages, Crossroads will try to push them to the network for the +specified amount of time before giving up. + +Note: inproc endpoints don't support partial shutdown at the moment. + +RETURN VALUE +------------ +The _xs_shutdown()_ function shall return zero if successful. Otherwise it +shall return `-1` and set 'errno' to one of the values defined below. + +ERRORS +------ +*EINVAL*:: +The endpoint ID supplied doesn't correspond to any active endpoint. +*ENOTSUP*:: +Specified endpoint doesn't support partial shutdown. +*ETERM*:: +The 'context' associated with the specified 'socket' was terminated. +*ENOTSOCK*:: +The provided 'socket' was invalid. + + +EXAMPLE +------- +.Binding socket to an endpoint, then unbinding it +---- +/* Create a socket */ +void *socket = xs_socket (context, XS_PUB); +assert (socket); +/* Bind it to a TCP endpoint */ +int id = xs_bind (socket, "tcp://*:5555"); +assert (id != -1); +/* Unbind the socket from the endpoint */ +rc = xs_shutdown (socket, id); +assert (rc == 0); +---- + + +SEE ALSO +-------- +linkxs:xs_connect[3] +linkxs:xs_bind[3] +linkxs:xs[7] + + +AUTHORS +------- +This manual page was written by Martin Sustrik <sustrik@250bpm.com>. diff --git a/doc/xs_tcp.txt b/doc/xs_tcp.txt index b97be08..7437e36 100644 --- a/doc/xs_tcp.txt +++ b/doc/xs_tcp.txt @@ -127,23 +127,23 @@ EXAMPLES ---- /* TCP port 5555 on all available interfaces */ rc = xs_bind(socket, "tcp://*:5555"); -assert (rc == 0); +assert (rc != -1); /* TCP port 5555 on the local loop-back interface on all platforms */ rc = xs_bind(socket, "tcp://127.0.0.1:5555"); -assert (rc == 0); +assert (rc != -1); /* TCP port 5555 on the first Ethernet network interface on Linux */ rc = xs_bind(socket, "tcp://eth0:5555"); -assert (rc == 0); +assert (rc != -1); ---- .Connecting a socket ---- /* Connecting using an IP address */ rc = xs_connect(socket, "tcp://192.168.1.1:5555"); -assert (rc == 0); +assert (rc != -1); /* Connecting using a DNS name */ rc = xs_connect(socket, "tcp://server1:5555"); -assert (rc == 0); +assert (rc != -1); ---- diff --git a/include/xs.h b/include/xs.h index 16e479d..960ba20 100644 --- a/include/xs.h +++ b/include/xs.h @@ -224,6 +224,7 @@ XS_EXPORT int xs_getsockopt (void *s, int option, void *optval, size_t *optvallen); XS_EXPORT int xs_bind (void *s, const char *addr); XS_EXPORT int xs_connect (void *s, const char *addr); +XS_EXPORT int xs_shutdown (void *s, int how); XS_EXPORT int xs_send (void *s, const void *buf, size_t len, int flags); XS_EXPORT int xs_recv (void *s, void *buf, size_t len, int flags); XS_EXPORT int xs_sendmsg (void *s, xs_msg_t *msg, int flags); diff --git a/perf/inproc_lat.cpp b/perf/inproc_lat.cpp index fc0b76b..54973db 100644 --- a/perf/inproc_lat.cpp +++ b/perf/inproc_lat.cpp @@ -55,7 +55,7 @@ static void *worker (void *ctx_) } rc = xs_connect (s, "inproc://lat_test"); - if (rc != 0) { + if (rc == -1) { printf ("error in xs_connect: %s\n", xs_strerror (errno)); exit (1); } @@ -135,7 +135,7 @@ int main (int argc, char *argv []) } rc = xs_bind (s, "inproc://lat_test"); - if (rc != 0) { + if (rc == -1) { printf ("error in xs_bind: %s\n", xs_strerror (errno)); return -1; } diff --git a/perf/inproc_thr.cpp b/perf/inproc_thr.cpp index 91f1df4..570d87e 100644 --- a/perf/inproc_thr.cpp +++ b/perf/inproc_thr.cpp @@ -55,7 +55,7 @@ static void *worker (void *ctx_) } rc = xs_connect (s, "inproc://thr_test"); - if (rc != 0) { + if (rc == -1) { printf ("error in xs_connect: %s\n", xs_strerror (errno)); exit (1); } @@ -134,7 +134,7 @@ int main (int argc, char *argv []) } rc = xs_bind (s, "inproc://thr_test"); - if (rc != 0) { + if (rc == -1) { printf ("error in xs_bind: %s\n", xs_strerror (errno)); return -1; } diff --git a/perf/local_lat.cpp b/perf/local_lat.cpp index 3dd1c8c..1fcf3f4 100644 --- a/perf/local_lat.cpp +++ b/perf/local_lat.cpp @@ -56,7 +56,7 @@ int main (int argc, char *argv []) } rc = xs_bind (s, bind_to); - if (rc != 0) { + if (rc == -1) { printf ("error in xs_bind: %s\n", xs_strerror (errno)); return -1; } diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp index 40fdbec..5a9e6f1 100644 --- a/perf/local_thr.cpp +++ b/perf/local_thr.cpp @@ -62,7 +62,7 @@ int main (int argc, char *argv []) // For example XS_RATE, XS_RECOVERY_IVL and XS_MCAST_LOOP for PGM. rc = xs_bind (s, bind_to); - if (rc != 0) { + if (rc == -1) { printf ("error in xs_bind: %s\n", xs_strerror (errno)); return -1; } diff --git a/perf/remote_lat.cpp b/perf/remote_lat.cpp index 0f05d2a..6aaf1e9 100644 --- a/perf/remote_lat.cpp +++ b/perf/remote_lat.cpp @@ -60,7 +60,7 @@ int main (int argc, char *argv []) } rc = xs_connect (s, connect_to); - if (rc != 0) { + if (rc == -1) { printf ("error in xs_connect: %s\n", xs_strerror (errno)); return -1; } diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp index fe4bbfa..b93c4d7 100644 --- a/perf/remote_thr.cpp +++ b/perf/remote_thr.cpp @@ -60,7 +60,7 @@ int main (int argc, char *argv []) // For example XS_RATE, XS_RECOVERY_IVL and XS_MCAST_LOOP for PGM. rc = xs_connect (s, connect_to); - if (rc != 0) { + if (rc == -1) { printf ("error in xs_connect: %s\n", xs_strerror (errno)); return -1; } diff --git a/src/own.hpp b/src/own.hpp index a2f0e9f..2d47a39 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -67,6 +67,12 @@ namespace xs protected: + // Handlers for incoming commands. + void process_own (own_t *object_); + void process_term_req (own_t *object_); + void process_term_ack (); + void process_seqnum (); + // Launch the supplied object and become its owner. void launch_child (own_t *object_); @@ -101,12 +107,6 @@ namespace xs // Set owner of the object void set_owner (own_t *owner_); - // Handlers for incoming commands. - void process_own (own_t *object_); - void process_term_req (own_t *object_); - void process_term_ack (); - void process_seqnum (); - // Check whether all the peding term acks were delivered. // If so, deallocate this object. void check_term_acks (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 3e57c28..ec5c8bd 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -347,7 +347,12 @@ int xs::socket_base_t::bind (const char *addr_) if (protocol == "inproc") { endpoint_t endpoint = {this, options}; rc = register_endpoint (addr_, endpoint); - return rc; + if (rc != 0) + return -1; + + // Endpoint IDs for inproc transport are not implemented at the + // moment. Thus we return 0 to the user. + return 0; } if (protocol == "pgm" || protocol == "epgm") { @@ -373,7 +378,7 @@ int xs::socket_base_t::bind (const char *addr_) return -1; } launch_child (listener); - return 0; + return add_endpoint (listener); } #if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS @@ -387,7 +392,7 @@ int xs::socket_base_t::bind (const char *addr_) return -1; } launch_child (listener); - return 0; + return add_endpoint (listener); } #endif @@ -478,6 +483,7 @@ int xs::socket_base_t::connect (const char *addr_) // increased here. send_bind (peer.socket, ppair [1], false); + // Inproc endpoints are not yet implemented thus we return 0. return 0; } @@ -512,7 +518,32 @@ int xs::socket_base_t::connect (const char *addr_) // Activate the session. Make it a child of this socket. launch_child (session); + return add_endpoint (session); +} + +int xs::socket_base_t::shutdown (int how_) +{ + // Check whether the library haven't been shut down yet. + if (unlikely (ctx_terminated)) { + errno = ETERM; + return -1; + } + // Endpoint ID means 'shutdown not implemented'. + if (how_ <= 0) { + errno = ENOTSUP; + return -1; + } + + // Find the endpoint corresponding to the ID. + endpoints_t::iterator it = endpoints.find (how_); + if (it == endpoints.end ()) { + errno = EINVAL; + return -1; + } + + process_term_req (it->second); + endpoints.erase (it); return 0; } @@ -937,3 +968,18 @@ uint64_t xs::socket_base_t::now_ms () { return clock.now_ms (); } + +int xs::socket_base_t::add_endpoint (own_t *endpoint_) +{ + // Get a unique endpoint ID. + int id = 1; + for (endpoints_t::iterator it = endpoints.begin (); it != endpoints.end (); + ++it, ++id) + if (it->first != id) + break; + + // Remember the endpoint. + endpoints.insert (std::make_pair (id, endpoint_)); + return id; +} + diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 54a5f0a..f73c413 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -23,6 +23,7 @@ #ifndef __XS_SOCKET_BASE_HPP_INCLUDED__ #define __XS_SOCKET_BASE_HPP_INCLUDED__ +#include <map> #include <string> #include "own.hpp" @@ -71,6 +72,7 @@ namespace xs int getsockopt (int option_, void *optval_, size_t *optvallen_); int bind (const char *addr_); int connect (const char *addr_); + int shutdown (int how_); int send (xs::msg_t *msg_, int flags_); int recv (xs::msg_t *msg_, int flags_); int close (); @@ -151,6 +153,9 @@ namespace xs // to be later retrieved by getsockopt. void extract_flags (msg_t *msg_); + // Creates new endpoint ID and adds the endpoint to the map. + int add_endpoint (own_t *endpoint_); + // Used to check whether the object is a socket. uint32_t tag; @@ -209,6 +214,10 @@ namespace xs // Improves efficiency of time measurement. clock_t clock; + // Map of open endpoints. + typedef std::map <int, own_t*> endpoints_t; + endpoints_t endpoints; + socket_base_t (const socket_base_t&); const socket_base_t &operator = (const socket_base_t&); }; @@ -212,6 +212,17 @@ int xs_connect (void *s_, const char *addr_) return rc; } +int xs_shutdown (void *s_, int how_) +{ + xs::socket_base_t *s = (xs::socket_base_t*) s_; + if (!s || !s->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + int rc = s->shutdown (how_); + return rc; +} + int xs_send (void *s_, const void *buf_, size_t len_, int flags_) { xs_msg_t msg; diff --git a/src/xszmq.cpp b/src/xszmq.cpp index d7199f9..f6e6fa1 100644 --- a/src/xszmq.cpp +++ b/src/xszmq.cpp @@ -355,12 +355,14 @@ int zmq_getsockopt (void *s, int option, void *optval, int zmq_bind (void *s, const char *addr) { - return xs_bind (s, addr); + int rc = xs_bind (s, addr); + return rc < 0 ? -1 : 0; } int zmq_connect (void *s, const char *addr) { - return xs_connect (s, addr); + int rc = xs_connect (s, addr); + return rc < 0 ? -1 : 0; } int zmq_send (void *s, zmq_msg_t *msg, int flags) diff --git a/tests/Makefile.am b/tests/Makefile.am index ba76260..47a880b 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -24,7 +24,8 @@ noinst_PROGRAMS = pair_inproc \ wireformat \ libzmq21 \ resubscribe \ - survey + survey \ + shutdown pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp @@ -48,5 +49,6 @@ wireformat_SOURCES = wireformat.cpp libzmq21_SOURCES = libzmq21.cpp resubscribe_SOURCES = resubscribe.cpp survey_SOURCES = survey.cpp +shutdown_SOURCES = shutdown.cpp TESTS = $(noinst_PROGRAMS) diff --git a/tests/hwm.cpp b/tests/hwm.cpp index 6cd9a41..93e8910 100644 --- a/tests/hwm.cpp +++ b/tests/hwm.cpp @@ -35,14 +35,14 @@ int XS_TEST_MAIN () int rc = xs_setsockopt (sb, XS_RCVHWM, &hwm, sizeof (hwm)); assert (rc == 0); rc = xs_bind (sb, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_PUSH); assert (sc); rc = xs_setsockopt (sc, XS_SNDHWM, &hwm, sizeof (hwm)); assert (rc == 0); rc = xs_connect (sc, "inproc://a"); - assert (rc == 0); + assert (rc != -1); // Try to send 10 messages. Only 4 should succeed. for (int i = 0; i < 10; i++) diff --git a/tests/invalid_rep.cpp b/tests/invalid_rep.cpp index cd0ad8e..93118b6 100644 --- a/tests/invalid_rep.cpp +++ b/tests/invalid_rep.cpp @@ -38,9 +38,9 @@ int XS_TEST_MAIN () rc = xs_setsockopt (req_socket, XS_LINGER, &linger, sizeof (int)); assert (rc == 0); rc = xs_bind (xrep_socket, "inproc://hi"); - assert (rc == 0); + assert (rc != -1); rc = xs_connect (req_socket, "inproc://hi"); - assert (rc == 0); + assert (rc != -1); // Initial request. rc = xs_send (req_socket, "r", 1, 0); diff --git a/tests/libzmq21.cpp b/tests/libzmq21.cpp index e7affae..ecf271b 100644 --- a/tests/libzmq21.cpp +++ b/tests/libzmq21.cpp @@ -57,7 +57,7 @@ int XS_TEST_MAIN () int rc = xs_setsockopt (pub, XS_PROTOCOL, &protocol, sizeof (protocol)); assert (rc == 0); rc = xs_bind (pub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); int oldsub = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); struct sockaddr_in address; @@ -105,7 +105,7 @@ int XS_TEST_MAIN () rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0); assert (rc == 0); rc = xs_bind (sub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); int oldpub = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); address.sin_family = AF_INET; diff --git a/tests/linger.cpp b/tests/linger.cpp index fec0f72..1b1d341 100644 --- a/tests/linger.cpp +++ b/tests/linger.cpp @@ -37,7 +37,7 @@ int XS_TEST_MAIN () // Connect to non-existent endpoing. assert (rc == 0); rc = xs_connect (s, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // Send a message. rc = xs_send (s, "r", 1, 0); diff --git a/tests/msg_flags.cpp b/tests/msg_flags.cpp index eade40f..3f3b3b1 100644 --- a/tests/msg_flags.cpp +++ b/tests/msg_flags.cpp @@ -30,11 +30,11 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_XREP); assert (sb); int rc = xs_bind (sb, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_XREQ); assert (sc); rc = xs_connect (sc, "inproc://a"); - assert (rc == 0); + assert (rc != -1); // Send 2-part message. rc = xs_send (sc, "A", 1, XS_SNDMORE); diff --git a/tests/pair_inproc.cpp b/tests/pair_inproc.cpp index c02d08b..456a371 100644 --- a/tests/pair_inproc.cpp +++ b/tests/pair_inproc.cpp @@ -30,12 +30,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_PAIR); assert (sb); int rc = xs_bind (sb, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_PAIR); assert (sc); rc = xs_connect (sc, "inproc://a"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/pair_ipc.cpp b/tests/pair_ipc.cpp index 43915cc..ef39bfc 100644 --- a/tests/pair_ipc.cpp +++ b/tests/pair_ipc.cpp @@ -37,12 +37,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_PAIR); assert (sb); int rc = xs_bind (sb, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_PAIR); assert (sc); rc = xs_connect (sc, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/pair_tcp.cpp b/tests/pair_tcp.cpp index a0decb9..1df90ad 100644 --- a/tests/pair_tcp.cpp +++ b/tests/pair_tcp.cpp @@ -31,12 +31,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_PAIR); assert (sb); int rc = xs_bind (sb, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_PAIR); assert (sc); rc = xs_connect (sc, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/polltimeo.cpp b/tests/polltimeo.cpp index ca88393..f425593 100644 --- a/tests/polltimeo.cpp +++ b/tests/polltimeo.cpp @@ -30,7 +30,7 @@ extern "C" void *sc = xs_socket (ctx_, XS_PUSH); assert (sc); int rc = xs_connect (sc, "inproc://timeout_test"); - assert (rc == 0); + assert (rc != -1); sleep (1); rc = xs_close (sc); assert (rc == 0); @@ -48,7 +48,7 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_PULL); assert (sb); int rc = xs_bind (sb, "inproc://timeout_test"); - assert (rc == 0); + assert (rc != -1); // Check whether timeout is honoured. xs_pollitem_t pi; diff --git a/tests/reconnect.cpp b/tests/reconnect.cpp index c677527..ebbaf32 100644 --- a/tests/reconnect.cpp +++ b/tests/reconnect.cpp @@ -34,7 +34,7 @@ int XS_TEST_MAIN () // Connect before bind was done at the peer and send one message. int rc = xs_connect (push, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); rc = xs_send (push, "ABC", 3, 0); assert (rc == 3); @@ -43,7 +43,7 @@ int XS_TEST_MAIN () // Bind the peer and get the message. rc = xs_bind (pull, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); unsigned char buf [3]; rc = xs_recv (pull, buf, sizeof (buf), 0); assert (rc == 3); @@ -64,7 +64,7 @@ int XS_TEST_MAIN () // Connect before bind was done at the peer and send one message. rc = xs_connect (push, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); rc = xs_send (push, "ABC", 3, 0); assert (rc == 3); @@ -73,7 +73,7 @@ int XS_TEST_MAIN () // Bind the peer and get the message. rc = xs_bind (pull, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); rc = xs_recv (pull, buf, sizeof (buf), 0); assert (rc == 3); diff --git a/tests/reqrep_device.cpp b/tests/reqrep_device.cpp index 81960d9..732f4dc 100644 --- a/tests/reqrep_device.cpp +++ b/tests/reqrep_device.cpp @@ -32,23 +32,23 @@ int XS_TEST_MAIN () void *xreq = xs_socket (ctx, XS_XREQ); assert (xreq); int rc = xs_bind (xreq, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); void *xrep = xs_socket (ctx, XS_XREP); assert (xrep); rc = xs_bind (xrep, "tcp://127.0.0.1:5561"); - assert (rc == 0); + assert (rc != -1); // Create a worker. void *rep = xs_socket (ctx, XS_REP); assert (rep); rc = xs_connect (rep, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // Create a client. void *req = xs_socket (ctx, XS_REQ); assert (req); rc = xs_connect (req, "tcp://127.0.0.1:5561"); - assert (rc == 0); + assert (rc != -1); // Send a request. rc = xs_send (req, "ABC", 3, XS_SNDMORE); diff --git a/tests/reqrep_inproc.cpp b/tests/reqrep_inproc.cpp index c66b82b..bd5cc92 100644 --- a/tests/reqrep_inproc.cpp +++ b/tests/reqrep_inproc.cpp @@ -30,12 +30,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_REP); assert (sb); int rc = xs_bind (sb, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_REQ); assert (sc); rc = xs_connect (sc, "inproc://a"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/reqrep_ipc.cpp b/tests/reqrep_ipc.cpp index 9be50fb..66315ef 100644 --- a/tests/reqrep_ipc.cpp +++ b/tests/reqrep_ipc.cpp @@ -37,12 +37,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_REP); assert (sb); int rc = xs_bind (sb, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_REQ); assert (sc); rc = xs_connect (sc, "ipc:///tmp/tester"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/reqrep_tcp.cpp b/tests/reqrep_tcp.cpp index b11d5a1..f68d762 100644 --- a/tests/reqrep_tcp.cpp +++ b/tests/reqrep_tcp.cpp @@ -31,12 +31,12 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_REP); assert (sb); int rc = xs_bind (sb, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); void *sc = xs_socket (ctx, XS_REQ); assert (sc); rc = xs_connect (sc, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); bounce (sb, sc); diff --git a/tests/resubscribe.cpp b/tests/resubscribe.cpp index 1924434..5d78712 100644 --- a/tests/resubscribe.cpp +++ b/tests/resubscribe.cpp @@ -34,13 +34,13 @@ int XS_TEST_MAIN () // Send two subscriptions upstream. int rc = xs_bind (xpub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); rc = xs_setsockopt (sub, XS_SUBSCRIBE, "a", 1); assert (rc == 0); rc = xs_setsockopt (sub, XS_SUBSCRIBE, "b", 1); assert (rc == 0); rc = xs_connect (sub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // Check whether subscriptions are correctly received. char buf [5]; @@ -68,7 +68,7 @@ int XS_TEST_MAIN () xpub = xs_socket (ctx, XS_XPUB); assert (xpub); rc = xs_bind (xpub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // We have to give control to the SUB socket here so that it has // chance to resend the subscriptions. diff --git a/tests/shutdown.cpp b/tests/shutdown.cpp new file mode 100644 index 0000000..9f055a1 --- /dev/null +++ b/tests/shutdown.cpp @@ -0,0 +1,103 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "testutil.hpp" + +int XS_TEST_MAIN () +{ + int rc; + char buf [32]; + + fprintf (stderr, "shutdown test running...\n"); + + // Create infrastructure. + void *ctx = xs_init (); + assert (ctx); + void *push = xs_socket (ctx, XS_PUSH); + assert (push); + int push_id = xs_bind (push, "tcp://127.0.0.1:5560"); + assert (push_id != -1); + void *pull = xs_socket (ctx, XS_PULL); + assert (pull); + rc = xs_connect (pull, "tcp://127.0.0.1:5560"); + assert (rc != -1); + + // Pass one message through to ensure the connection is established. + rc = xs_send (push, "ABC", 3, 0); + assert (rc == 3); + rc = xs_recv (pull, buf, sizeof (buf), 0); + assert (rc == 3); + + // Shut down the bound endpoint. + rc = xs_shutdown (push, push_id); + assert (rc == 0); + sleep (1); + + // Check that sending would block (there's no outbound connection). + rc = xs_send (push, "ABC", 3, XS_DONTWAIT); + assert (rc == -1 && xs_errno () == EAGAIN); + + // Clean up. + rc = xs_close (pull); + assert (rc == 0); + rc = xs_close (push); + assert (rc == 0); + rc = xs_term (ctx); + assert (rc == 0); + + // Now the other way round. + + // Create infrastructure. + ctx = xs_init (); + assert (ctx); + pull = xs_socket (ctx, XS_PULL); + assert (pull); + rc = xs_bind (pull, "tcp://127.0.0.1:5560"); + assert (rc != -1); + push = xs_socket (ctx, XS_PUSH); + assert (push); + push_id = xs_connect (push, "tcp://127.0.0.1:5560"); + assert (push_id != -1); + + // Pass one message through to ensure the connection is established. + rc = xs_send (push, "ABC", 3, 0); + assert (rc == 3); + rc = xs_recv (pull, buf, sizeof (buf), 0); + assert (rc == 3); + + // Shut down the bound endpoint. + rc = xs_shutdown (push, push_id); + assert (rc == 0); + sleep (1); + + // Check that sending would block (there's no outbound connection). + rc = xs_send (push, "ABC", 3, XS_DONTWAIT); + assert (rc == -1 && xs_errno () == EAGAIN); + + // Clean up. + rc = xs_close (pull); + assert (rc == 0); + rc = xs_close (push); + assert (rc == 0); + rc = xs_term (ctx); + assert (rc == 0); + + return 0; +} diff --git a/tests/shutdown_stress.cpp b/tests/shutdown_stress.cpp index f106cc0..2b598a9 100644 --- a/tests/shutdown_stress.cpp +++ b/tests/shutdown_stress.cpp @@ -30,7 +30,7 @@ extern "C" int rc; rc = xs_connect (s_, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // Start closing the socket while the connecting process is underway. rc = xs_close (s_); @@ -65,7 +65,7 @@ int XS_TEST_MAIN () assert (s1); rc = xs_bind (s1, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); for (i = 0; i != THREAD_COUNT; i++) { s2 = xs_socket (ctx, XS_SUB); diff --git a/tests/sub_forward.cpp b/tests/sub_forward.cpp index 6d385de..0237352 100644 --- a/tests/sub_forward.cpp +++ b/tests/sub_forward.cpp @@ -32,23 +32,23 @@ int XS_TEST_MAIN () void *xpub = xs_socket (ctx, XS_XPUB); assert (xpub); int rc = xs_bind (xpub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); void *xsub = xs_socket (ctx, XS_XSUB); assert (xsub); rc = xs_bind (xsub, "tcp://127.0.0.1:5561"); - assert (rc == 0); + assert (rc != -1); // Create a publisher. void *pub = xs_socket (ctx, XS_PUB); assert (pub); rc = xs_connect (pub, "tcp://127.0.0.1:5561"); - assert (rc == 0); + assert (rc != -1); // Create a subscriber. void *sub = xs_socket (ctx, XS_SUB); assert (sub); rc = xs_connect (sub, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); // Subscribe for all messages. rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0); diff --git a/tests/survey.cpp b/tests/survey.cpp index 9347e2d..f21b217 100644 --- a/tests/survey.cpp +++ b/tests/survey.cpp @@ -33,23 +33,23 @@ int XS_TEST_MAIN () void *xsurveyor = xs_socket (ctx, XS_XSURVEYOR); assert (xsurveyor); rc = xs_bind (xsurveyor, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *xrespondent = xs_socket (ctx, XS_XRESPONDENT); assert (xrespondent); rc = xs_bind (xrespondent, "inproc://b"); - assert (rc == 0); + assert (rc != -1); void *surveyor = xs_socket (ctx, XS_SURVEYOR); assert (surveyor); rc = xs_connect (surveyor, "inproc://b"); - assert (rc == 0); + assert (rc != -1); void *respondent1 = xs_socket (ctx, XS_RESPONDENT); assert (respondent1); rc = xs_connect (respondent1, "inproc://a"); - assert (rc == 0); + assert (rc != -1); void *respondent2 = xs_socket (ctx, XS_RESPONDENT); assert (respondent2); rc = xs_connect (respondent2, "inproc://a"); - assert (rc == 0); + assert (rc != -1); // Send the survey. rc = xs_send (surveyor, "ABC", 3, 0); diff --git a/tests/tests.cpp b/tests/tests.cpp index 3d7951c..da034e6 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -115,6 +115,10 @@ #include "survey.cpp" #undef XS_TEST_MAIN +#define XS_TEST_MAIN shutdown +#include "shutdown.cpp" +#undef XS_TEST_MAIN + int main () { int rc; @@ -161,6 +165,8 @@ int main () assert (rc == 0); rc = survey (); assert (rc == 0); + rc = shutdown () + assert (rc == 0); fprintf (stderr, "SUCCESS\n"); sleep (1); diff --git a/tests/timeo.cpp b/tests/timeo.cpp index bc73eec..fb17c85 100644 --- a/tests/timeo.cpp +++ b/tests/timeo.cpp @@ -30,7 +30,7 @@ extern "C" void *sc = xs_socket (ctx_, XS_PUSH); assert (sc); int rc = xs_connect (sc, "inproc://timeout_test"); - assert (rc == 0); + assert (rc != -1); sleep (1); rc = xs_close (sc); assert (rc == 0); @@ -48,7 +48,7 @@ int XS_TEST_MAIN () void *sb = xs_socket (ctx, XS_PULL); assert (sb); int rc = xs_bind (sb, "inproc://timeout_test"); - assert (rc == 0); + assert (rc != -1); // Check whether non-blocking recv returns immediately. char buf [] = "12345678ABCDEFGH12345678abcdefgh"; @@ -90,7 +90,7 @@ int XS_TEST_MAIN () rc = xs_setsockopt(sb, XS_SNDTIMEO, &timeout, timeout_size); assert (rc == 0); rc = xs_connect (sc, "inproc://timeout_test"); - assert (rc == 0); + assert (rc != -1); rc = xs_send (sc, buf, 32, 0); assert (rc == 32); rc = xs_recv (sb, buf, 32, 0); diff --git a/tests/wireformat.cpp b/tests/wireformat.cpp index 1177798..f3e0f96 100644 --- a/tests/wireformat.cpp +++ b/tests/wireformat.cpp @@ -50,9 +50,9 @@ int XS_TEST_MAIN () // Bind the peer and get the message. int rc = xs_bind (pull, "tcp://127.0.0.1:5560"); - assert (rc == 0); + assert (rc != -1); rc = xs_bind (push, "tcp://127.0.0.1:5561"); - assert (rc == 0); + assert (rc != -1); // Connect to the peer using raw sockets. int rpush = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); |