summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x.gitignore1
-rw-r--r--builds/msvc/tests/tests.vcxproj6
-rw-r--r--builds/msvc/tests/tests.vcxproj.filters5
-rw-r--r--doc/Makefile.am2
-rw-r--r--doc/xs_bind.txt8
-rw-r--r--doc/xs_connect.txt8
-rw-r--r--doc/xs_inproc.txt8
-rw-r--r--doc/xs_ipc.txt4
-rw-r--r--doc/xs_pgm.txt4
-rw-r--r--doc/xs_setsockopt.txt8
-rw-r--r--doc/xs_shutdown.txt69
-rw-r--r--doc/xs_tcp.txt10
-rw-r--r--include/xs.h1
-rw-r--r--perf/inproc_lat.cpp4
-rw-r--r--perf/inproc_thr.cpp4
-rw-r--r--perf/local_lat.cpp2
-rw-r--r--perf/local_thr.cpp2
-rw-r--r--perf/remote_lat.cpp2
-rw-r--r--perf/remote_thr.cpp2
-rw-r--r--src/own.hpp12
-rw-r--r--src/socket_base.cpp52
-rw-r--r--src/socket_base.hpp9
-rw-r--r--src/xs.cpp11
-rw-r--r--src/xszmq.cpp6
-rw-r--r--tests/Makefile.am4
-rw-r--r--tests/hwm.cpp4
-rw-r--r--tests/invalid_rep.cpp4
-rw-r--r--tests/libzmq21.cpp4
-rw-r--r--tests/linger.cpp2
-rw-r--r--tests/msg_flags.cpp4
-rw-r--r--tests/pair_inproc.cpp4
-rw-r--r--tests/pair_ipc.cpp4
-rw-r--r--tests/pair_tcp.cpp4
-rw-r--r--tests/polltimeo.cpp4
-rw-r--r--tests/reconnect.cpp8
-rw-r--r--tests/reqrep_device.cpp8
-rw-r--r--tests/reqrep_inproc.cpp4
-rw-r--r--tests/reqrep_ipc.cpp4
-rw-r--r--tests/reqrep_tcp.cpp4
-rw-r--r--tests/resubscribe.cpp6
-rw-r--r--tests/shutdown.cpp103
-rw-r--r--tests/shutdown_stress.cpp4
-rw-r--r--tests/sub_forward.cpp8
-rw-r--r--tests/survey.cpp10
-rw-r--r--tests/tests.cpp6
-rw-r--r--tests/timeo.cpp6
-rw-r--r--tests/wireformat.cpp4
47 files changed, 355 insertions, 98 deletions
diff --git a/.gitignore b/.gitignore
index 2db7190..3c207b9 100755
--- a/.gitignore
+++ b/.gitignore
@@ -50,6 +50,7 @@ tests/wireformat
tests/libzmq21
tests/resubscribe
tests/survey
+tests/shutdown
src/platform.hpp*
src/stamp-h1
perf/*.exe
diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj
index d1258e4..20d2554 100644
--- a/builds/msvc/tests/tests.vcxproj
+++ b/builds/msvc/tests/tests.vcxproj
@@ -155,6 +155,10 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
</ClCompile>
+ <ClCompile Include="..\..\..\tests\shutdown.cpp">
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
+ </ClCompile>
<ClCompile Include="..\..\..\tests\tests.cpp" />
<ClCompile Include="..\..\..\tests\timeo.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
@@ -188,4 +192,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
-</Project> \ No newline at end of file
+</Project>
diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters
index 0fb41a8..ea97fed 100644
--- a/builds/msvc/tests/tests.vcxproj.filters
+++ b/builds/msvc/tests/tests.vcxproj.filters
@@ -68,6 +68,9 @@
<ClCompile Include="..\..\..\tests\survey.cpp">
<Filter>Header Files</Filter>
</ClCompile>
+ <ClCompile Include="..\..\..\tests\shutdown.cpp">
+ <Filter>Header Files</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="Header Files">
@@ -79,4 +82,4 @@
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
-</Project> \ No newline at end of file
+</Project>
diff --git a/doc/Makefile.am b/doc/Makefile.am
index 66dd87b..9f90feb 100644
--- a/doc/Makefile.am
+++ b/doc/Makefile.am
@@ -3,7 +3,7 @@ MAN3 = xs_bind.3 xs_close.3 xs_connect.3 xs_init.3 \
xs_msg_init_data.3 xs_msg_init_size.3 xs_msg_move.3 xs_msg_size.3 \
xs_poll.3 xs_recv.3 xs_send.3 xs_setsockopt.3 xs_socket.3 \
xs_strerror.3 xs_term.3 xs_version.3 xs_getsockopt.3 xs_errno.3 \
- xs_sendmsg.3 xs_recvmsg.3 xs_getmsgopt.3 xs_setctxopt.3
+ xs_sendmsg.3 xs_recvmsg.3 xs_getmsgopt.3 xs_setctxopt.3 xs_shutdown.3
MAN7 = xs.7 xs_tcp.7 xs_pgm.7 xs_inproc.7 xs_ipc.7 xs_zmq.7
MAN_DOC = $(MAN1) $(MAN3) $(MAN7)
diff --git a/doc/xs_bind.txt b/doc/xs_bind.txt
index fc08124..3472f04 100644
--- a/doc/xs_bind.txt
+++ b/doc/xs_bind.txt
@@ -38,8 +38,8 @@ semantics involved when connecting or binding a socket to multiple endpoints.
RETURN VALUE
------------
-The _xs_bind()_ function shall return zero if successful. Otherwise it shall
-return `-1` and set 'errno' to one of the values defined below.
+The _xs_bind()_ function shall return endpoint ID if successful. Otherwise it
+shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
@@ -71,10 +71,10 @@ void *socket = xs_socket (context, XS_PUB);
assert (socket);
/* Bind it to a in-process transport with the address 'my_publisher' */
int rc = xs_bind (socket, "inproc://my_publisher");
-assert (rc == 0);
+assert (rc != -1);
/* Bind it to a TCP transport on port 5555 of the 'eth0' interface */
rc = xs_bind (socket, "tcp://eth0:5555");
-assert (rc == 0);
+assert (rc != -1);
----
diff --git a/doc/xs_connect.txt b/doc/xs_connect.txt
index 898b915..02597da 100644
--- a/doc/xs_connect.txt
+++ b/doc/xs_connect.txt
@@ -42,8 +42,8 @@ physical connection was or can actually be established.
RETURN VALUE
------------
-The _xs_connect()_ function shall return zero if successful. Otherwise it
-shall return `-1` and set 'errno' to one of the values defined below.
+The _xs_connect()_ function shall return endpoint ID if successful. Otherwise
+it shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
@@ -69,10 +69,10 @@ void *socket = xs_socket (context, XS_SUB);
assert (socket);
/* Connect it to an in-process transport with the address 'my_publisher' */
int rc = xs_connect (socket, "inproc://my_publisher");
-assert (rc == 0);
+assert (rc != -1);
/* Connect it to the host server001, port 5555 using a TCP transport */
rc = xs_connect (socket, "tcp://server001:5555");
-assert (rc == 0);
+assert (rc != -1);
----
diff --git a/doc/xs_inproc.txt b/doc/xs_inproc.txt
index 9ec255f..f0dae94 100644
--- a/doc/xs_inproc.txt
+++ b/doc/xs_inproc.txt
@@ -50,20 +50,20 @@ EXAMPLES
----
/* Assign the in-process name "#1" */
rc = xs_bind(socket, "inproc://#1");
-assert (rc == 0);
+assert (rc != -1);
/* Assign the in-process name "my-endpoint" */
rc = xs_bind(socket, "inproc://my-endpoint");
-assert (rc == 0);
+assert (rc != -1);
----
.Connecting a socket
----
/* Connect to the in-process name "#1" */
rc = xs_connect(socket, "inproc://#1");
-assert (rc == 0);
+assert (rc != -1);
/* Connect to the in-process name "my-endpoint" */
rc = xs_connect(socket, "inproc://my-endpoint");
-assert (rc == 0);
+assert (rc != -1);
----
diff --git a/doc/xs_ipc.txt b/doc/xs_ipc.txt
index f3c5147..b917c78 100644
--- a/doc/xs_ipc.txt
+++ b/doc/xs_ipc.txt
@@ -54,14 +54,14 @@ EXAMPLES
----
/* Assign the pathname "/tmp/feeds/0" */
rc = xs_bind(socket, "ipc:///tmp/feeds/0");
-assert (rc == 0);
+assert (rc != -1);
----
.Connecting a socket
----
/* Connect to the pathname "/tmp/feeds/0" */
rc = xs_connect(socket, "ipc:///tmp/feeds/0");
-assert (rc == 0);
+assert (rc != -1);
----
SEE ALSO
diff --git a/doc/xs_pgm.txt b/doc/xs_pgm.txt
index fcce19a..2fc6ca8 100644
--- a/doc/xs_pgm.txt
+++ b/doc/xs_pgm.txt
@@ -137,12 +137,12 @@ EXAMPLE
/* using the first Ethernet network interface on Linux */
/* and the Encapsulated PGM protocol */
rc = xs_connect(socket, "epgm://eth0;239.192.1.1:5555");
-assert (rc == 0);
+assert (rc != -1);
/* Connecting to the multicast address 239.192.1.1, port 5555, */
/* using the network interface with the address 192.168.1.1 */
/* and the standard PGM protocol */
rc = xs_connect(socket, "pgm://192.168.1.1;239.192.1.1:5555");
-assert (rc == 0);
+assert (rc != -1);
----
diff --git a/doc/xs_setsockopt.txt b/doc/xs_setsockopt.txt
index e07d9dd..d636c83 100644
--- a/doc/xs_setsockopt.txt
+++ b/doc/xs_setsockopt.txt
@@ -397,15 +397,15 @@ int64_t affinity;
/* Incoming connections on TCP port 5555 shall be handled by I/O thread 1 */
affinity = 1;
rc = xs_setsockopt (socket, XS_AFFINITY, &affinity, sizeof affinity);
-assert (rc);
+assert (rc == 0);
rc = xs_bind (socket, "tcp://lo:5555");
-assert (rc);
+assert (rc != -1);
/* Incoming connections on TCP port 5556 shall be handled by I/O thread 2 */
affinity = 2;
rc = xs_setsockopt (socket, XS_AFFINITY, &affinity, sizeof affinity);
-assert (rc);
+assert (rc == 0);
rc = xs_bind (socket, "tcp://lo:5556");
-assert (rc);
+assert (rc != -1);
----
diff --git a/doc/xs_shutdown.txt b/doc/xs_shutdown.txt
new file mode 100644
index 0000000..8943933
--- /dev/null
+++ b/doc/xs_shutdown.txt
@@ -0,0 +1,69 @@
+xs_bind(3)
+==========
+
+
+NAME
+----
+xs_shutdown - shut down part of the socket
+
+
+SYNOPSIS
+--------
+*int xs_shutdown (void '*socket', int 'how');*
+
+
+DESCRIPTION
+-----------
+This function partially closes the socket. It disconnects or unbinds an endpoint
+previously connected or bound by _xs_bind()_ or _xs_connect()_. 'how' parameter
+is the endpoint ID as returned by _xs_bind()_ or _xs_connect()_.
+
+Endpoint shutdown honours 'linger' socket option. I.e. if there are any pending
+outbound messages, Crossroads will try to push them to the network for the
+specified amount of time before giving up.
+
+Note: inproc endpoints don't support partial shutdown at the moment.
+
+RETURN VALUE
+------------
+The _xs_shutdown()_ function shall return zero if successful. Otherwise it
+shall return `-1` and set 'errno' to one of the values defined below.
+
+ERRORS
+------
+*EINVAL*::
+The endpoint ID supplied doesn't correspond to any active endpoint.
+*ENOTSUP*::
+Specified endpoint doesn't support partial shutdown.
+*ETERM*::
+The 'context' associated with the specified 'socket' was terminated.
+*ENOTSOCK*::
+The provided 'socket' was invalid.
+
+
+EXAMPLE
+-------
+.Binding socket to an endpoint, then unbinding it
+----
+/* Create a socket */
+void *socket = xs_socket (context, XS_PUB);
+assert (socket);
+/* Bind it to a TCP endpoint */
+int id = xs_bind (socket, "tcp://*:5555");
+assert (id != -1);
+/* Unbind the socket from the endpoint */
+rc = xs_shutdown (socket, id);
+assert (rc == 0);
+----
+
+
+SEE ALSO
+--------
+linkxs:xs_connect[3]
+linkxs:xs_bind[3]
+linkxs:xs[7]
+
+
+AUTHORS
+-------
+This manual page was written by Martin Sustrik <sustrik@250bpm.com>.
diff --git a/doc/xs_tcp.txt b/doc/xs_tcp.txt
index b97be08..7437e36 100644
--- a/doc/xs_tcp.txt
+++ b/doc/xs_tcp.txt
@@ -127,23 +127,23 @@ EXAMPLES
----
/* TCP port 5555 on all available interfaces */
rc = xs_bind(socket, "tcp://*:5555");
-assert (rc == 0);
+assert (rc != -1);
/* TCP port 5555 on the local loop-back interface on all platforms */
rc = xs_bind(socket, "tcp://127.0.0.1:5555");
-assert (rc == 0);
+assert (rc != -1);
/* TCP port 5555 on the first Ethernet network interface on Linux */
rc = xs_bind(socket, "tcp://eth0:5555");
-assert (rc == 0);
+assert (rc != -1);
----
.Connecting a socket
----
/* Connecting using an IP address */
rc = xs_connect(socket, "tcp://192.168.1.1:5555");
-assert (rc == 0);
+assert (rc != -1);
/* Connecting using a DNS name */
rc = xs_connect(socket, "tcp://server1:5555");
-assert (rc == 0);
+assert (rc != -1);
----
diff --git a/include/xs.h b/include/xs.h
index 16e479d..960ba20 100644
--- a/include/xs.h
+++ b/include/xs.h
@@ -224,6 +224,7 @@ XS_EXPORT int xs_getsockopt (void *s, int option, void *optval,
size_t *optvallen);
XS_EXPORT int xs_bind (void *s, const char *addr);
XS_EXPORT int xs_connect (void *s, const char *addr);
+XS_EXPORT int xs_shutdown (void *s, int how);
XS_EXPORT int xs_send (void *s, const void *buf, size_t len, int flags);
XS_EXPORT int xs_recv (void *s, void *buf, size_t len, int flags);
XS_EXPORT int xs_sendmsg (void *s, xs_msg_t *msg, int flags);
diff --git a/perf/inproc_lat.cpp b/perf/inproc_lat.cpp
index fc0b76b..54973db 100644
--- a/perf/inproc_lat.cpp
+++ b/perf/inproc_lat.cpp
@@ -55,7 +55,7 @@ static void *worker (void *ctx_)
}
rc = xs_connect (s, "inproc://lat_test");
- if (rc != 0) {
+ if (rc == -1) {
printf ("error in xs_connect: %s\n", xs_strerror (errno));
exit (1);
}
@@ -135,7 +135,7 @@ int main (int argc, char *argv [])
}
rc = xs_bind (s, "inproc://lat_test");
- if (rc != 0) {
+ if (rc == -1) {
printf ("error in xs_bind: %s\n", xs_strerror (errno));
return -1;
}
diff --git a/perf/inproc_thr.cpp b/perf/inproc_thr.cpp
index 91f1df4..570d87e 100644
--- a/perf/inproc_thr.cpp
+++ b/perf/inproc_thr.cpp
@@ -55,7 +55,7 @@ static void *worker (void *ctx_)
}
rc = xs_connect (s, "inproc://thr_test");
- if (rc != 0) {
+ if (rc == -1) {
printf ("error in xs_connect: %s\n", xs_strerror (errno));
exit (1);
}
@@ -134,7 +134,7 @@ int main (int argc, char *argv [])
}
rc = xs_bind (s, "inproc://thr_test");
- if (rc != 0) {
+ if (rc == -1) {
printf ("error in xs_bind: %s\n", xs_strerror (errno));
return -1;
}
diff --git a/perf/local_lat.cpp b/perf/local_lat.cpp
index 3dd1c8c..1fcf3f4 100644
--- a/perf/local_lat.cpp
+++ b/perf/local_lat.cpp
@@ -56,7 +56,7 @@ int main (int argc, char *argv [])
}
rc = xs_bind (s, bind_to);
- if (rc != 0) {
+ if (rc == -1) {
printf ("error in xs_bind: %s\n", xs_strerror (errno));
return -1;
}
diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp
index 40fdbec..5a9e6f1 100644
--- a/perf/local_thr.cpp
+++ b/perf/local_thr.cpp
@@ -62,7 +62,7 @@ int main (int argc, char *argv [])
// For example XS_RATE, XS_RECOVERY_IVL and XS_MCAST_LOOP for PGM.
rc = xs_bind (s, bind_to);
- if (rc != 0) {
+ if (rc == -1) {
printf ("error in xs_bind: %s\n", xs_strerror (errno));
return -1;
}
diff --git a/perf/remote_lat.cpp b/perf/remote_lat.cpp
index 0f05d2a..6aaf1e9 100644
--- a/perf/remote_lat.cpp
+++ b/perf/remote_lat.cpp
@@ -60,7 +60,7 @@ int main (int argc, char *argv [])
}
rc = xs_connect (s, connect_to);
- if (rc != 0) {
+ if (rc == -1) {
printf ("error in xs_connect: %s\n", xs_strerror (errno));
return -1;
}
diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp
index fe4bbfa..b93c4d7 100644
--- a/perf/remote_thr.cpp
+++ b/perf/remote_thr.cpp
@@ -60,7 +60,7 @@ int main (int argc, char *argv [])
// For example XS_RATE, XS_RECOVERY_IVL and XS_MCAST_LOOP for PGM.
rc = xs_connect (s, connect_to);
- if (rc != 0) {
+ if (rc == -1) {
printf ("error in xs_connect: %s\n", xs_strerror (errno));
return -1;
}
diff --git a/src/own.hpp b/src/own.hpp
index a2f0e9f..2d47a39 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -67,6 +67,12 @@ namespace xs
protected:
+ // Handlers for incoming commands.
+ void process_own (own_t *object_);
+ void process_term_req (own_t *object_);
+ void process_term_ack ();
+ void process_seqnum ();
+
// Launch the supplied object and become its owner.
void launch_child (own_t *object_);
@@ -101,12 +107,6 @@ namespace xs
// Set owner of the object
void set_owner (own_t *owner_);
- // Handlers for incoming commands.
- void process_own (own_t *object_);
- void process_term_req (own_t *object_);
- void process_term_ack ();
- void process_seqnum ();
-
// Check whether all the peding term acks were delivered.
// If so, deallocate this object.
void check_term_acks ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 3e57c28..ec5c8bd 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -347,7 +347,12 @@ int xs::socket_base_t::bind (const char *addr_)
if (protocol == "inproc") {
endpoint_t endpoint = {this, options};
rc = register_endpoint (addr_, endpoint);
- return rc;
+ if (rc != 0)
+ return -1;
+
+ // Endpoint IDs for inproc transport are not implemented at the
+ // moment. Thus we return 0 to the user.
+ return 0;
}
if (protocol == "pgm" || protocol == "epgm") {
@@ -373,7 +378,7 @@ int xs::socket_base_t::bind (const char *addr_)
return -1;
}
launch_child (listener);
- return 0;
+ return add_endpoint (listener);
}
#if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS
@@ -387,7 +392,7 @@ int xs::socket_base_t::bind (const char *addr_)
return -1;
}
launch_child (listener);
- return 0;
+ return add_endpoint (listener);
}
#endif
@@ -478,6 +483,7 @@ int xs::socket_base_t::connect (const char *addr_)
// increased here.
send_bind (peer.socket, ppair [1], false);
+ // Inproc endpoints are not yet implemented thus we return 0.
return 0;
}
@@ -512,7 +518,32 @@ int xs::socket_base_t::connect (const char *addr_)
// Activate the session. Make it a child of this socket.
launch_child (session);
+ return add_endpoint (session);
+}
+
+int xs::socket_base_t::shutdown (int how_)
+{
+ // Check whether the library haven't been shut down yet.
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+ // Endpoint ID means 'shutdown not implemented'.
+ if (how_ <= 0) {
+ errno = ENOTSUP;
+ return -1;
+ }
+
+ // Find the endpoint corresponding to the ID.
+ endpoints_t::iterator it = endpoints.find (how_);
+ if (it == endpoints.end ()) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ process_term_req (it->second);
+ endpoints.erase (it);
return 0;
}
@@ -937,3 +968,18 @@ uint64_t xs::socket_base_t::now_ms ()
{
return clock.now_ms ();
}
+
+int xs::socket_base_t::add_endpoint (own_t *endpoint_)
+{
+ // Get a unique endpoint ID.
+ int id = 1;
+ for (endpoints_t::iterator it = endpoints.begin (); it != endpoints.end ();
+ ++it, ++id)
+ if (it->first != id)
+ break;
+
+ // Remember the endpoint.
+ endpoints.insert (std::make_pair (id, endpoint_));
+ return id;
+}
+
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 54a5f0a..f73c413 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -23,6 +23,7 @@
#ifndef __XS_SOCKET_BASE_HPP_INCLUDED__
#define __XS_SOCKET_BASE_HPP_INCLUDED__
+#include <map>
#include <string>
#include "own.hpp"
@@ -71,6 +72,7 @@ namespace xs
int getsockopt (int option_, void *optval_, size_t *optvallen_);
int bind (const char *addr_);
int connect (const char *addr_);
+ int shutdown (int how_);
int send (xs::msg_t *msg_, int flags_);
int recv (xs::msg_t *msg_, int flags_);
int close ();
@@ -151,6 +153,9 @@ namespace xs
// to be later retrieved by getsockopt.
void extract_flags (msg_t *msg_);
+ // Creates new endpoint ID and adds the endpoint to the map.
+ int add_endpoint (own_t *endpoint_);
+
// Used to check whether the object is a socket.
uint32_t tag;
@@ -209,6 +214,10 @@ namespace xs
// Improves efficiency of time measurement.
clock_t clock;
+ // Map of open endpoints.
+ typedef std::map <int, own_t*> endpoints_t;
+ endpoints_t endpoints;
+
socket_base_t (const socket_base_t&);
const socket_base_t &operator = (const socket_base_t&);
};
diff --git a/src/xs.cpp b/src/xs.cpp
index 7f4cdd2..81daded 100644
--- a/src/xs.cpp
+++ b/src/xs.cpp
@@ -212,6 +212,17 @@ int xs_connect (void *s_, const char *addr_)
return rc;
}
+int xs_shutdown (void *s_, int how_)
+{
+ xs::socket_base_t *s = (xs::socket_base_t*) s_;
+ if (!s || !s->check_tag ()) {
+ errno = ENOTSOCK;
+ return -1;
+ }
+ int rc = s->shutdown (how_);
+ return rc;
+}
+
int xs_send (void *s_, const void *buf_, size_t len_, int flags_)
{
xs_msg_t msg;
diff --git a/src/xszmq.cpp b/src/xszmq.cpp
index d7199f9..f6e6fa1 100644
--- a/src/xszmq.cpp
+++ b/src/xszmq.cpp
@@ -355,12 +355,14 @@ int zmq_getsockopt (void *s, int option, void *optval,
int zmq_bind (void *s, const char *addr)
{
- return xs_bind (s, addr);
+ int rc = xs_bind (s, addr);
+ return rc < 0 ? -1 : 0;
}
int zmq_connect (void *s, const char *addr)
{
- return xs_connect (s, addr);
+ int rc = xs_connect (s, addr);
+ return rc < 0 ? -1 : 0;
}
int zmq_send (void *s, zmq_msg_t *msg, int flags)
diff --git a/tests/Makefile.am b/tests/Makefile.am
index ba76260..47a880b 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -24,7 +24,8 @@ noinst_PROGRAMS = pair_inproc \
wireformat \
libzmq21 \
resubscribe \
- survey
+ survey \
+ shutdown
pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp
pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp
@@ -48,5 +49,6 @@ wireformat_SOURCES = wireformat.cpp
libzmq21_SOURCES = libzmq21.cpp
resubscribe_SOURCES = resubscribe.cpp
survey_SOURCES = survey.cpp
+shutdown_SOURCES = shutdown.cpp
TESTS = $(noinst_PROGRAMS)
diff --git a/tests/hwm.cpp b/tests/hwm.cpp
index 6cd9a41..93e8910 100644
--- a/tests/hwm.cpp
+++ b/tests/hwm.cpp
@@ -35,14 +35,14 @@ int XS_TEST_MAIN ()
int rc = xs_setsockopt (sb, XS_RCVHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = xs_bind (sb, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
void *sc = xs_socket (ctx, XS_PUSH);
assert (sc);
rc = xs_setsockopt (sc, XS_SNDHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = xs_connect (sc, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
// Try to send 10 messages. Only 4 should succeed.
for (int i = 0; i < 10; i++)
diff --git a/tests/invalid_rep.cpp b/tests/invalid_rep.cpp
index cd0ad8e..93118b6 100644
--- a/tests/invalid_rep.cpp
+++ b/tests/invalid_rep.cpp
@@ -38,9 +38,9 @@ int XS_TEST_MAIN ()
rc = xs_setsockopt (req_socket, XS_LINGER, &linger, sizeof (int));
assert (rc == 0);
rc = xs_bind (xrep_socket, "inproc://hi");
- assert (rc == 0);
+ assert (rc != -1);
rc = xs_connect (req_socket, "inproc://hi");
- assert (rc == 0);
+ assert (rc != -1);
// Initial request.
rc = xs_send (req_socket, "r", 1, 0);
diff --git a/tests/libzmq21.cpp b/tests/libzmq21.cpp
index e7affae..ecf271b 100644
--- a/tests/libzmq21.cpp
+++ b/tests/libzmq21.cpp
@@ -57,7 +57,7 @@ int XS_TEST_MAIN ()
int rc = xs_setsockopt (pub, XS_PROTOCOL, &protocol, sizeof (protocol));
assert (rc == 0);
rc = xs_bind (pub, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
int oldsub = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
struct sockaddr_in address;
@@ -105,7 +105,7 @@ int XS_TEST_MAIN ()
rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = xs_bind (sub, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
int oldpub = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
address.sin_family = AF_INET;
diff --git a/tests/linger.cpp b/tests/linger.cpp
index fec0f72..1b1d341 100644
--- a/tests/linger.cpp
+++ b/tests/linger.cpp
@@ -37,7 +37,7 @@ int XS_TEST_MAIN ()
// Connect to non-existent endpoing.
assert (rc == 0);
rc = xs_connect (s, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
// Send a message.
rc = xs_send (s, "r", 1, 0);
diff --git a/tests/msg_flags.cpp b/tests/msg_flags.cpp
index eade40f..3f3b3b1 100644
--- a/tests/msg_flags.cpp
+++ b/tests/msg_flags.cpp
@@ -30,11 +30,11 @@ int XS_TEST_MAIN ()
void *sb = xs_socket (ctx, XS_XREP);
assert (sb);
int rc = xs_bind (sb, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
void *sc = xs_socket (ctx, XS_XREQ);
assert (sc);
rc = xs_connect (sc, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
// Send 2-part message.
rc = xs_send (sc, "A", 1, XS_SNDMORE);
diff --git a/tests/pair_inproc.cpp b/tests/pair_inproc.cpp
index c02d08b..456a371 100644
--- a/tests/pair_inproc.cpp
+++ b/tests/pair_inproc.cpp
@@ -30,12 +30,12 @@ int XS_TEST_MAIN ()
void *sb = xs_socket (ctx, XS_PAIR);
assert (sb);
int rc = xs_bind (sb, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
void *sc = xs_socket (ctx, XS_PAIR);
assert (sc);
rc = xs_connect (sc, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
bounce (sb, sc);
diff --git a/tests/pair_ipc.cpp b/tests/pair_ipc.cpp
index 43915cc..ef39bfc 100644
--- a/tests/pair_ipc.cpp
+++ b/tests/pair_ipc.cpp
@@ -37,12 +37,12 @@ int XS_TEST_MAIN ()
void *sb = xs_socket (ctx, XS_PAIR);
assert (sb);
int rc = xs_bind (sb, "ipc:///tmp/tester");
- assert (rc == 0);
+ assert (rc != -1);
void *sc = xs_socket (ctx, XS_PAIR);
assert (sc);
rc = xs_connect (sc, "ipc:///tmp/tester");
- assert (rc == 0);
+ assert (rc != -1);
bounce (sb, sc);
diff --git a/tests/pair_tcp.cpp b/tests/pair_tcp.cpp
index a0decb9..1df90ad 100644
--- a/tests/pair_tcp.cpp
+++ b/tests/pair_tcp.cpp
@@ -31,12 +31,12 @@ int XS_TEST_MAIN ()
void *sb = xs_socket (ctx, XS_PAIR);
assert (sb);
int rc = xs_bind (sb, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
void *sc = xs_socket (ctx, XS_PAIR);
assert (sc);
rc = xs_connect (sc, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
bounce (sb, sc);
diff --git a/tests/polltimeo.cpp b/tests/polltimeo.cpp
index ca88393..f425593 100644
--- a/tests/polltimeo.cpp
+++ b/tests/polltimeo.cpp
@@ -30,7 +30,7 @@ extern "C"
void *sc = xs_socket (ctx_, XS_PUSH);
assert (sc);
int rc = xs_connect (sc, "inproc://timeout_test");
- assert (rc == 0);
+ assert (rc != -1);
sleep (1);
rc = xs_close (sc);
assert (rc == 0);
@@ -48,7 +48,7 @@ int XS_TEST_MAIN ()
void *sb = xs_socket (ctx, XS_PULL);
assert (sb);
int rc = xs_bind (sb, "inproc://timeout_test");
- assert (rc == 0);
+ assert (rc != -1);
// Check whether timeout is honoured.
xs_pollitem_t pi;
diff --git a/tests/reconnect.cpp b/tests/reconnect.cpp
index c677527..ebbaf32 100644
--- a/tests/reconnect.cpp
+++ b/tests/reconnect.cpp
@@ -34,7 +34,7 @@ int XS_TEST_MAIN ()
// Connect before bind was done at the peer and send one message.
int rc = xs_connect (push, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
rc = xs_send (push, "ABC", 3, 0);
assert (rc == 3);
@@ -43,7 +43,7 @@ int XS_TEST_MAIN ()
// Bind the peer and get the message.
rc = xs_bind (pull, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
unsigned char buf [3];
rc = xs_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3);
@@ -64,7 +64,7 @@ int XS_TEST_MAIN ()
// Connect before bind was done at the peer and send one message.
rc = xs_connect (push, "ipc:///tmp/tester");
- assert (rc == 0);
+ assert (rc != -1);
rc = xs_send (push, "ABC", 3, 0);
assert (rc == 3);
@@ -73,7 +73,7 @@ int XS_TEST_MAIN ()
// Bind the peer and get the message.
rc = xs_bind (pull, "ipc:///tmp/tester");
- assert (rc == 0);
+ assert (rc != -1);
rc = xs_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3);
diff --git a/tests/reqrep_device.cpp b/tests/reqrep_device.cpp
index 81960d9..732f4dc 100644
--- a/tests/reqrep_device.cpp
+++ b/tests/reqrep_device.cpp
@@ -32,23 +32,23 @@ int XS_TEST_MAIN ()
void *xreq = xs_socket (ctx, XS_XREQ);
assert (xreq);
int rc = xs_bind (xreq, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
void *xrep = xs_socket (ctx, XS_XREP);
assert (xrep);
rc = xs_bind (xrep, "tcp://127.0.0.1:5561");
- assert (rc == 0);
+ assert (rc != -1);
// Create a worker.
void *rep = xs_socket (ctx, XS_REP);
assert (rep);
rc = xs_connect (rep, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
// Create a client.
void *req = xs_socket (ctx, XS_REQ);
assert (req);
rc = xs_connect (req, "tcp://127.0.0.1:5561");
- assert (rc == 0);
+ assert (rc != -1);
// Send a request.
rc = xs_send (req, "ABC", 3, XS_SNDMORE);
diff --git a/tests/reqrep_inproc.cpp b/tests/reqrep_inproc.cpp
index c66b82b..bd5cc92 100644
--- a/tests/reqrep_inproc.cpp
+++ b/tests/reqrep_inproc.cpp
@@ -30,12 +30,12 @@ int XS_TEST_MAIN ()
void *sb = xs_socket (ctx, XS_REP);
assert (sb);
int rc = xs_bind (sb, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
void *sc = xs_socket (ctx, XS_REQ);
assert (sc);
rc = xs_connect (sc, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
bounce (sb, sc);
diff --git a/tests/reqrep_ipc.cpp b/tests/reqrep_ipc.cpp
index 9be50fb..66315ef 100644
--- a/tests/reqrep_ipc.cpp
+++ b/tests/reqrep_ipc.cpp
@@ -37,12 +37,12 @@ int XS_TEST_MAIN ()
void *sb = xs_socket (ctx, XS_REP);
assert (sb);
int rc = xs_bind (sb, "ipc:///tmp/tester");
- assert (rc == 0);
+ assert (rc != -1);
void *sc = xs_socket (ctx, XS_REQ);
assert (sc);
rc = xs_connect (sc, "ipc:///tmp/tester");
- assert (rc == 0);
+ assert (rc != -1);
bounce (sb, sc);
diff --git a/tests/reqrep_tcp.cpp b/tests/reqrep_tcp.cpp
index b11d5a1..f68d762 100644
--- a/tests/reqrep_tcp.cpp
+++ b/tests/reqrep_tcp.cpp
@@ -31,12 +31,12 @@ int XS_TEST_MAIN ()
void *sb = xs_socket (ctx, XS_REP);
assert (sb);
int rc = xs_bind (sb, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
void *sc = xs_socket (ctx, XS_REQ);
assert (sc);
rc = xs_connect (sc, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
bounce (sb, sc);
diff --git a/tests/resubscribe.cpp b/tests/resubscribe.cpp
index 1924434..5d78712 100644
--- a/tests/resubscribe.cpp
+++ b/tests/resubscribe.cpp
@@ -34,13 +34,13 @@ int XS_TEST_MAIN ()
// Send two subscriptions upstream.
int rc = xs_bind (xpub, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
rc = xs_setsockopt (sub, XS_SUBSCRIBE, "a", 1);
assert (rc == 0);
rc = xs_setsockopt (sub, XS_SUBSCRIBE, "b", 1);
assert (rc == 0);
rc = xs_connect (sub, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
// Check whether subscriptions are correctly received.
char buf [5];
@@ -68,7 +68,7 @@ int XS_TEST_MAIN ()
xpub = xs_socket (ctx, XS_XPUB);
assert (xpub);
rc = xs_bind (xpub, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
// We have to give control to the SUB socket here so that it has
// chance to resend the subscriptions.
diff --git a/tests/shutdown.cpp b/tests/shutdown.cpp
new file mode 100644
index 0000000..9f055a1
--- /dev/null
+++ b/tests/shutdown.cpp
@@ -0,0 +1,103 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "testutil.hpp"
+
+int XS_TEST_MAIN ()
+{
+ int rc;
+ char buf [32];
+
+ fprintf (stderr, "shutdown test running...\n");
+
+ // Create infrastructure.
+ void *ctx = xs_init ();
+ assert (ctx);
+ void *push = xs_socket (ctx, XS_PUSH);
+ assert (push);
+ int push_id = xs_bind (push, "tcp://127.0.0.1:5560");
+ assert (push_id != -1);
+ void *pull = xs_socket (ctx, XS_PULL);
+ assert (pull);
+ rc = xs_connect (pull, "tcp://127.0.0.1:5560");
+ assert (rc != -1);
+
+ // Pass one message through to ensure the connection is established.
+ rc = xs_send (push, "ABC", 3, 0);
+ assert (rc == 3);
+ rc = xs_recv (pull, buf, sizeof (buf), 0);
+ assert (rc == 3);
+
+ // Shut down the bound endpoint.
+ rc = xs_shutdown (push, push_id);
+ assert (rc == 0);
+ sleep (1);
+
+ // Check that sending would block (there's no outbound connection).
+ rc = xs_send (push, "ABC", 3, XS_DONTWAIT);
+ assert (rc == -1 && xs_errno () == EAGAIN);
+
+ // Clean up.
+ rc = xs_close (pull);
+ assert (rc == 0);
+ rc = xs_close (push);
+ assert (rc == 0);
+ rc = xs_term (ctx);
+ assert (rc == 0);
+
+ // Now the other way round.
+
+ // Create infrastructure.
+ ctx = xs_init ();
+ assert (ctx);
+ pull = xs_socket (ctx, XS_PULL);
+ assert (pull);
+ rc = xs_bind (pull, "tcp://127.0.0.1:5560");
+ assert (rc != -1);
+ push = xs_socket (ctx, XS_PUSH);
+ assert (push);
+ push_id = xs_connect (push, "tcp://127.0.0.1:5560");
+ assert (push_id != -1);
+
+ // Pass one message through to ensure the connection is established.
+ rc = xs_send (push, "ABC", 3, 0);
+ assert (rc == 3);
+ rc = xs_recv (pull, buf, sizeof (buf), 0);
+ assert (rc == 3);
+
+ // Shut down the bound endpoint.
+ rc = xs_shutdown (push, push_id);
+ assert (rc == 0);
+ sleep (1);
+
+ // Check that sending would block (there's no outbound connection).
+ rc = xs_send (push, "ABC", 3, XS_DONTWAIT);
+ assert (rc == -1 && xs_errno () == EAGAIN);
+
+ // Clean up.
+ rc = xs_close (pull);
+ assert (rc == 0);
+ rc = xs_close (push);
+ assert (rc == 0);
+ rc = xs_term (ctx);
+ assert (rc == 0);
+
+ return 0;
+}
diff --git a/tests/shutdown_stress.cpp b/tests/shutdown_stress.cpp
index f106cc0..2b598a9 100644
--- a/tests/shutdown_stress.cpp
+++ b/tests/shutdown_stress.cpp
@@ -30,7 +30,7 @@ extern "C"
int rc;
rc = xs_connect (s_, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
// Start closing the socket while the connecting process is underway.
rc = xs_close (s_);
@@ -65,7 +65,7 @@ int XS_TEST_MAIN ()
assert (s1);
rc = xs_bind (s1, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
for (i = 0; i != THREAD_COUNT; i++) {
s2 = xs_socket (ctx, XS_SUB);
diff --git a/tests/sub_forward.cpp b/tests/sub_forward.cpp
index 6d385de..0237352 100644
--- a/tests/sub_forward.cpp
+++ b/tests/sub_forward.cpp
@@ -32,23 +32,23 @@ int XS_TEST_MAIN ()
void *xpub = xs_socket (ctx, XS_XPUB);
assert (xpub);
int rc = xs_bind (xpub, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
void *xsub = xs_socket (ctx, XS_XSUB);
assert (xsub);
rc = xs_bind (xsub, "tcp://127.0.0.1:5561");
- assert (rc == 0);
+ assert (rc != -1);
// Create a publisher.
void *pub = xs_socket (ctx, XS_PUB);
assert (pub);
rc = xs_connect (pub, "tcp://127.0.0.1:5561");
- assert (rc == 0);
+ assert (rc != -1);
// Create a subscriber.
void *sub = xs_socket (ctx, XS_SUB);
assert (sub);
rc = xs_connect (sub, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
// Subscribe for all messages.
rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0);
diff --git a/tests/survey.cpp b/tests/survey.cpp
index 9347e2d..f21b217 100644
--- a/tests/survey.cpp
+++ b/tests/survey.cpp
@@ -33,23 +33,23 @@ int XS_TEST_MAIN ()
void *xsurveyor = xs_socket (ctx, XS_XSURVEYOR);
assert (xsurveyor);
rc = xs_bind (xsurveyor, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
void *xrespondent = xs_socket (ctx, XS_XRESPONDENT);
assert (xrespondent);
rc = xs_bind (xrespondent, "inproc://b");
- assert (rc == 0);
+ assert (rc != -1);
void *surveyor = xs_socket (ctx, XS_SURVEYOR);
assert (surveyor);
rc = xs_connect (surveyor, "inproc://b");
- assert (rc == 0);
+ assert (rc != -1);
void *respondent1 = xs_socket (ctx, XS_RESPONDENT);
assert (respondent1);
rc = xs_connect (respondent1, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
void *respondent2 = xs_socket (ctx, XS_RESPONDENT);
assert (respondent2);
rc = xs_connect (respondent2, "inproc://a");
- assert (rc == 0);
+ assert (rc != -1);
// Send the survey.
rc = xs_send (surveyor, "ABC", 3, 0);
diff --git a/tests/tests.cpp b/tests/tests.cpp
index 3d7951c..da034e6 100644
--- a/tests/tests.cpp
+++ b/tests/tests.cpp
@@ -115,6 +115,10 @@
#include "survey.cpp"
#undef XS_TEST_MAIN
+#define XS_TEST_MAIN shutdown
+#include "shutdown.cpp"
+#undef XS_TEST_MAIN
+
int main ()
{
int rc;
@@ -161,6 +165,8 @@ int main ()
assert (rc == 0);
rc = survey ();
assert (rc == 0);
+ rc = shutdown ()
+ assert (rc == 0);
fprintf (stderr, "SUCCESS\n");
sleep (1);
diff --git a/tests/timeo.cpp b/tests/timeo.cpp
index bc73eec..fb17c85 100644
--- a/tests/timeo.cpp
+++ b/tests/timeo.cpp
@@ -30,7 +30,7 @@ extern "C"
void *sc = xs_socket (ctx_, XS_PUSH);
assert (sc);
int rc = xs_connect (sc, "inproc://timeout_test");
- assert (rc == 0);
+ assert (rc != -1);
sleep (1);
rc = xs_close (sc);
assert (rc == 0);
@@ -48,7 +48,7 @@ int XS_TEST_MAIN ()
void *sb = xs_socket (ctx, XS_PULL);
assert (sb);
int rc = xs_bind (sb, "inproc://timeout_test");
- assert (rc == 0);
+ assert (rc != -1);
// Check whether non-blocking recv returns immediately.
char buf [] = "12345678ABCDEFGH12345678abcdefgh";
@@ -90,7 +90,7 @@ int XS_TEST_MAIN ()
rc = xs_setsockopt(sb, XS_SNDTIMEO, &timeout, timeout_size);
assert (rc == 0);
rc = xs_connect (sc, "inproc://timeout_test");
- assert (rc == 0);
+ assert (rc != -1);
rc = xs_send (sc, buf, 32, 0);
assert (rc == 32);
rc = xs_recv (sb, buf, 32, 0);
diff --git a/tests/wireformat.cpp b/tests/wireformat.cpp
index 1177798..f3e0f96 100644
--- a/tests/wireformat.cpp
+++ b/tests/wireformat.cpp
@@ -50,9 +50,9 @@ int XS_TEST_MAIN ()
// Bind the peer and get the message.
int rc = xs_bind (pull, "tcp://127.0.0.1:5560");
- assert (rc == 0);
+ assert (rc != -1);
rc = xs_bind (push, "tcp://127.0.0.1:5561");
- assert (rc == 0);
+ assert (rc != -1);
// Connect to the peer using raw sockets.
int rpush = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);