diff options
47 files changed, 355 insertions, 98 deletions
| @@ -50,6 +50,7 @@ tests/wireformat  tests/libzmq21  tests/resubscribe  tests/survey +tests/shutdown  src/platform.hpp*  src/stamp-h1  perf/*.exe diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj index d1258e4..20d2554 100644 --- a/builds/msvc/tests/tests.vcxproj +++ b/builds/msvc/tests/tests.vcxproj @@ -155,6 +155,10 @@        <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>        <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>      </ClCompile> +    <ClCompile Include="..\..\..\tests\shutdown.cpp"> +      <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> +      <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> +    </ClCompile>      <ClCompile Include="..\..\..\tests\tests.cpp" />      <ClCompile Include="..\..\..\tests\timeo.cpp">        <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> @@ -188,4 +192,4 @@    <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />    <ImportGroup Label="ExtensionTargets">    </ImportGroup> -</Project>
\ No newline at end of file +</Project> diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters index 0fb41a8..ea97fed 100644 --- a/builds/msvc/tests/tests.vcxproj.filters +++ b/builds/msvc/tests/tests.vcxproj.filters @@ -68,6 +68,9 @@      <ClCompile Include="..\..\..\tests\survey.cpp">        <Filter>Header Files</Filter>      </ClCompile> +    <ClCompile Include="..\..\..\tests\shutdown.cpp"> +      <Filter>Header Files</Filter> +    </ClCompile>    </ItemGroup>    <ItemGroup>      <Filter Include="Header Files"> @@ -79,4 +82,4 @@        <Filter>Header Files</Filter>      </ClInclude>    </ItemGroup> -</Project>
\ No newline at end of file +</Project> diff --git a/doc/Makefile.am b/doc/Makefile.am index 66dd87b..9f90feb 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -3,7 +3,7 @@ MAN3 = xs_bind.3 xs_close.3 xs_connect.3 xs_init.3 \      xs_msg_init_data.3 xs_msg_init_size.3 xs_msg_move.3 xs_msg_size.3 \      xs_poll.3 xs_recv.3 xs_send.3 xs_setsockopt.3 xs_socket.3 \      xs_strerror.3 xs_term.3 xs_version.3 xs_getsockopt.3 xs_errno.3 \ -    xs_sendmsg.3 xs_recvmsg.3 xs_getmsgopt.3 xs_setctxopt.3 +    xs_sendmsg.3 xs_recvmsg.3 xs_getmsgopt.3 xs_setctxopt.3 xs_shutdown.3  MAN7 = xs.7 xs_tcp.7 xs_pgm.7 xs_inproc.7 xs_ipc.7 xs_zmq.7  MAN_DOC = $(MAN1) $(MAN3) $(MAN7) diff --git a/doc/xs_bind.txt b/doc/xs_bind.txt index fc08124..3472f04 100644 --- a/doc/xs_bind.txt +++ b/doc/xs_bind.txt @@ -38,8 +38,8 @@ semantics involved when connecting or binding a socket to multiple endpoints.  RETURN VALUE  ------------ -The _xs_bind()_ function shall return zero if successful. Otherwise it shall -return `-1` and set 'errno' to one of the values defined below. +The _xs_bind()_ function shall return endpoint ID if successful. Otherwise it +shall return `-1` and set 'errno' to one of the values defined below.  ERRORS @@ -71,10 +71,10 @@ void *socket = xs_socket (context, XS_PUB);  assert (socket);  /* Bind it to a in-process transport with the address 'my_publisher' */  int rc = xs_bind (socket, "inproc://my_publisher"); -assert (rc == 0); +assert (rc != -1);  /* Bind it to a TCP transport on port 5555 of the 'eth0' interface */  rc = xs_bind (socket, "tcp://eth0:5555"); -assert (rc == 0); +assert (rc != -1);  ---- diff --git a/doc/xs_connect.txt b/doc/xs_connect.txt index 898b915..02597da 100644 --- a/doc/xs_connect.txt +++ b/doc/xs_connect.txt @@ -42,8 +42,8 @@ physical connection was or can actually be established.  RETURN VALUE  ------------ -The _xs_connect()_ function shall return zero if successful. Otherwise it -shall return `-1` and set 'errno' to one of the values defined below. +The _xs_connect()_ function shall return endpoint ID if successful. Otherwise +it shall return `-1` and set 'errno' to one of the values defined below.  ERRORS @@ -69,10 +69,10 @@ void *socket = xs_socket (context, XS_SUB);  assert (socket);  /* Connect it to an in-process transport with the address 'my_publisher' */  int rc = xs_connect (socket, "inproc://my_publisher"); -assert (rc == 0); +assert (rc != -1);  /* Connect it to the host server001, port 5555 using a TCP transport */  rc = xs_connect (socket, "tcp://server001:5555"); -assert (rc == 0); +assert (rc != -1);  ---- diff --git a/doc/xs_inproc.txt b/doc/xs_inproc.txt index 9ec255f..f0dae94 100644 --- a/doc/xs_inproc.txt +++ b/doc/xs_inproc.txt @@ -50,20 +50,20 @@ EXAMPLES  ----  /* Assign the in-process name "#1" */  rc = xs_bind(socket, "inproc://#1"); -assert (rc == 0); +assert (rc != -1);  /* Assign the in-process name "my-endpoint" */  rc = xs_bind(socket, "inproc://my-endpoint"); -assert (rc == 0); +assert (rc != -1);  ----  .Connecting a socket  ----  /* Connect to the in-process name "#1" */  rc = xs_connect(socket, "inproc://#1"); -assert (rc == 0); +assert (rc != -1);  /* Connect to the in-process name "my-endpoint" */  rc = xs_connect(socket, "inproc://my-endpoint"); -assert (rc == 0); +assert (rc != -1);  ---- diff --git a/doc/xs_ipc.txt b/doc/xs_ipc.txt index f3c5147..b917c78 100644 --- a/doc/xs_ipc.txt +++ b/doc/xs_ipc.txt @@ -54,14 +54,14 @@ EXAMPLES  ----  /* Assign the pathname "/tmp/feeds/0" */  rc = xs_bind(socket, "ipc:///tmp/feeds/0"); -assert (rc == 0); +assert (rc != -1);  ----  .Connecting a socket  ----  /* Connect to the pathname "/tmp/feeds/0" */  rc = xs_connect(socket, "ipc:///tmp/feeds/0"); -assert (rc == 0); +assert (rc != -1);  ----  SEE ALSO diff --git a/doc/xs_pgm.txt b/doc/xs_pgm.txt index fcce19a..2fc6ca8 100644 --- a/doc/xs_pgm.txt +++ b/doc/xs_pgm.txt @@ -137,12 +137,12 @@ EXAMPLE  /* using the first Ethernet network interface on Linux */  /* and the Encapsulated PGM protocol */  rc = xs_connect(socket, "epgm://eth0;239.192.1.1:5555"); -assert (rc == 0); +assert (rc != -1);  /* Connecting to the multicast address 239.192.1.1, port 5555, */  /* using the network interface with the address 192.168.1.1 */  /* and the standard PGM protocol */  rc = xs_connect(socket, "pgm://192.168.1.1;239.192.1.1:5555"); -assert (rc == 0); +assert (rc != -1);  ---- diff --git a/doc/xs_setsockopt.txt b/doc/xs_setsockopt.txt index e07d9dd..d636c83 100644 --- a/doc/xs_setsockopt.txt +++ b/doc/xs_setsockopt.txt @@ -397,15 +397,15 @@ int64_t affinity;  /* Incoming connections on TCP port 5555 shall be handled by I/O thread 1 */  affinity = 1;  rc = xs_setsockopt (socket, XS_AFFINITY, &affinity, sizeof affinity); -assert (rc); +assert (rc == 0);  rc = xs_bind (socket, "tcp://lo:5555"); -assert (rc); +assert (rc != -1);  /* Incoming connections on TCP port 5556 shall be handled by I/O thread 2 */  affinity = 2;  rc = xs_setsockopt (socket, XS_AFFINITY, &affinity, sizeof affinity); -assert (rc); +assert (rc == 0);  rc = xs_bind (socket, "tcp://lo:5556"); -assert (rc); +assert (rc != -1);  ---- diff --git a/doc/xs_shutdown.txt b/doc/xs_shutdown.txt new file mode 100644 index 0000000..8943933 --- /dev/null +++ b/doc/xs_shutdown.txt @@ -0,0 +1,69 @@ +xs_bind(3) +========== + + +NAME +---- +xs_shutdown - shut down part of the socket + + +SYNOPSIS +-------- +*int xs_shutdown (void '*socket', int 'how');* + + +DESCRIPTION +----------- +This function partially closes the socket. It disconnects or unbinds an endpoint +previously connected or bound by _xs_bind()_ or _xs_connect()_. 'how' parameter +is the endpoint ID as returned by _xs_bind()_ or _xs_connect()_. + +Endpoint shutdown honours 'linger' socket option. I.e. if there are any pending +outbound messages, Crossroads will try to push them to the network for the +specified amount of time before giving up. + +Note: inproc endpoints don't support partial shutdown at the moment. + +RETURN VALUE +------------ +The _xs_shutdown()_ function shall return zero if successful. Otherwise it +shall return `-1` and set 'errno' to one of the values defined below. + +ERRORS +------ +*EINVAL*:: +The endpoint ID supplied doesn't correspond to any active endpoint. +*ENOTSUP*:: +Specified endpoint doesn't support partial shutdown. +*ETERM*:: +The 'context' associated with the specified 'socket' was terminated. +*ENOTSOCK*:: +The provided 'socket' was invalid. + + +EXAMPLE +------- +.Binding socket to an endpoint, then unbinding it +---- +/* Create a socket */ +void *socket = xs_socket (context, XS_PUB); +assert (socket); +/* Bind it to a TCP endpoint */ +int id = xs_bind (socket, "tcp://*:5555"); +assert (id != -1); +/* Unbind the socket from the endpoint */ +rc = xs_shutdown (socket, id); +assert (rc == 0); +---- + + +SEE ALSO +-------- +linkxs:xs_connect[3] +linkxs:xs_bind[3] +linkxs:xs[7] + + +AUTHORS +------- +This manual page was written by Martin Sustrik <sustrik@250bpm.com>. diff --git a/doc/xs_tcp.txt b/doc/xs_tcp.txt index b97be08..7437e36 100644 --- a/doc/xs_tcp.txt +++ b/doc/xs_tcp.txt @@ -127,23 +127,23 @@ EXAMPLES  ----  /* TCP port 5555 on all available interfaces */  rc = xs_bind(socket, "tcp://*:5555"); -assert (rc == 0); +assert (rc != -1);  /* TCP port 5555 on the local loop-back interface on all platforms */  rc = xs_bind(socket, "tcp://127.0.0.1:5555"); -assert (rc == 0); +assert (rc != -1);  /* TCP port 5555 on the first Ethernet network interface on Linux */  rc = xs_bind(socket, "tcp://eth0:5555"); -assert (rc == 0); +assert (rc != -1);  ----  .Connecting a socket  ----  /* Connecting using an IP address */  rc = xs_connect(socket, "tcp://192.168.1.1:5555"); -assert (rc == 0); +assert (rc != -1);  /* Connecting using a DNS name */  rc = xs_connect(socket, "tcp://server1:5555"); -assert (rc == 0); +assert (rc != -1);  ---- diff --git a/include/xs.h b/include/xs.h index 16e479d..960ba20 100644 --- a/include/xs.h +++ b/include/xs.h @@ -224,6 +224,7 @@ XS_EXPORT int xs_getsockopt (void *s, int option, void *optval,      size_t *optvallen);  XS_EXPORT int xs_bind (void *s, const char *addr);  XS_EXPORT int xs_connect (void *s, const char *addr); +XS_EXPORT int xs_shutdown (void *s, int how);  XS_EXPORT int xs_send (void *s, const void *buf, size_t len, int flags);  XS_EXPORT int xs_recv (void *s, void *buf, size_t len, int flags);  XS_EXPORT int xs_sendmsg (void *s, xs_msg_t *msg, int flags); diff --git a/perf/inproc_lat.cpp b/perf/inproc_lat.cpp index fc0b76b..54973db 100644 --- a/perf/inproc_lat.cpp +++ b/perf/inproc_lat.cpp @@ -55,7 +55,7 @@ static void *worker (void *ctx_)      }      rc = xs_connect (s, "inproc://lat_test"); -    if (rc != 0) { +    if (rc == -1) {          printf ("error in xs_connect: %s\n", xs_strerror (errno));          exit (1);      } @@ -135,7 +135,7 @@ int main (int argc, char *argv [])      }      rc = xs_bind (s, "inproc://lat_test"); -    if (rc != 0) { +    if (rc == -1) {          printf ("error in xs_bind: %s\n", xs_strerror (errno));          return -1;      } diff --git a/perf/inproc_thr.cpp b/perf/inproc_thr.cpp index 91f1df4..570d87e 100644 --- a/perf/inproc_thr.cpp +++ b/perf/inproc_thr.cpp @@ -55,7 +55,7 @@ static void *worker (void *ctx_)      }      rc = xs_connect (s, "inproc://thr_test"); -    if (rc != 0) { +    if (rc == -1) {          printf ("error in xs_connect: %s\n", xs_strerror (errno));          exit (1);      } @@ -134,7 +134,7 @@ int main (int argc, char *argv [])      }      rc = xs_bind (s, "inproc://thr_test"); -    if (rc != 0) { +    if (rc == -1) {          printf ("error in xs_bind: %s\n", xs_strerror (errno));          return -1;      } diff --git a/perf/local_lat.cpp b/perf/local_lat.cpp index 3dd1c8c..1fcf3f4 100644 --- a/perf/local_lat.cpp +++ b/perf/local_lat.cpp @@ -56,7 +56,7 @@ int main (int argc, char *argv [])      }      rc = xs_bind (s, bind_to); -    if (rc != 0) { +    if (rc == -1) {          printf ("error in xs_bind: %s\n", xs_strerror (errno));          return -1;      } diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp index 40fdbec..5a9e6f1 100644 --- a/perf/local_thr.cpp +++ b/perf/local_thr.cpp @@ -62,7 +62,7 @@ int main (int argc, char *argv [])      //  For example XS_RATE, XS_RECOVERY_IVL and XS_MCAST_LOOP for PGM.      rc = xs_bind (s, bind_to); -    if (rc != 0) { +    if (rc == -1) {          printf ("error in xs_bind: %s\n", xs_strerror (errno));          return -1;      } diff --git a/perf/remote_lat.cpp b/perf/remote_lat.cpp index 0f05d2a..6aaf1e9 100644 --- a/perf/remote_lat.cpp +++ b/perf/remote_lat.cpp @@ -60,7 +60,7 @@ int main (int argc, char *argv [])      }      rc = xs_connect (s, connect_to); -    if (rc != 0) { +    if (rc == -1) {          printf ("error in xs_connect: %s\n", xs_strerror (errno));          return -1;      } diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp index fe4bbfa..b93c4d7 100644 --- a/perf/remote_thr.cpp +++ b/perf/remote_thr.cpp @@ -60,7 +60,7 @@ int main (int argc, char *argv [])      //  For example XS_RATE, XS_RECOVERY_IVL and XS_MCAST_LOOP for PGM.      rc = xs_connect (s, connect_to); -    if (rc != 0) { +    if (rc == -1) {          printf ("error in xs_connect: %s\n", xs_strerror (errno));          return -1;      } diff --git a/src/own.hpp b/src/own.hpp index a2f0e9f..2d47a39 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -67,6 +67,12 @@ namespace xs      protected: +        //  Handlers for incoming commands. +        void process_own (own_t *object_); +        void process_term_req (own_t *object_); +        void process_term_ack (); +        void process_seqnum (); +          //  Launch the supplied object and become its owner.          void launch_child (own_t *object_); @@ -101,12 +107,6 @@ namespace xs          //  Set owner of the object          void set_owner (own_t *owner_); -        //  Handlers for incoming commands. -        void process_own (own_t *object_); -        void process_term_req (own_t *object_); -        void process_term_ack (); -        void process_seqnum (); -          //  Check whether all the peding term acks were delivered.          //  If so, deallocate this object.          void check_term_acks (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 3e57c28..ec5c8bd 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -347,7 +347,12 @@ int xs::socket_base_t::bind (const char *addr_)      if (protocol == "inproc") {          endpoint_t endpoint = {this, options};          rc = register_endpoint (addr_, endpoint); -        return rc; +        if (rc != 0) +            return -1; + +        //  Endpoint IDs for inproc transport are not implemented at the +        //  moment. Thus we return 0 to the user. +        return 0;      }      if (protocol == "pgm" || protocol == "epgm") { @@ -373,7 +378,7 @@ int xs::socket_base_t::bind (const char *addr_)              return -1;          }          launch_child (listener); -        return 0; +        return add_endpoint (listener);      }  #if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS @@ -387,7 +392,7 @@ int xs::socket_base_t::bind (const char *addr_)              return -1;          }          launch_child (listener); -        return 0; +        return add_endpoint (listener);      }  #endif @@ -478,6 +483,7 @@ int xs::socket_base_t::connect (const char *addr_)          //  increased here.          send_bind (peer.socket, ppair [1], false); +        //  Inproc endpoints are not yet implemented thus we return 0.          return 0;      } @@ -512,7 +518,32 @@ int xs::socket_base_t::connect (const char *addr_)      //  Activate the session. Make it a child of this socket.      launch_child (session); +    return add_endpoint (session); +} + +int xs::socket_base_t::shutdown (int how_) +{ +    //  Check whether the library haven't been shut down yet. +    if (unlikely (ctx_terminated)) { +        errno = ETERM; +        return -1; +    } +    //  Endpoint ID means 'shutdown not implemented'. +    if (how_ <= 0) { +        errno = ENOTSUP; +        return -1; +    } + +    //  Find the endpoint corresponding to the ID. +    endpoints_t::iterator it = endpoints.find (how_); +    if (it == endpoints.end ()) { +        errno = EINVAL; +        return -1; +    } + +    process_term_req (it->second); +    endpoints.erase (it);      return 0;  } @@ -937,3 +968,18 @@ uint64_t xs::socket_base_t::now_ms ()  {      return clock.now_ms ();  } + +int xs::socket_base_t::add_endpoint (own_t *endpoint_) +{ +    //  Get a unique endpoint ID. +    int id = 1; +    for (endpoints_t::iterator it = endpoints.begin (); it != endpoints.end (); +          ++it, ++id) +        if (it->first != id) +            break; + +    //  Remember the endpoint. +    endpoints.insert (std::make_pair (id, endpoint_)); +    return id; +} + diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 54a5f0a..f73c413 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -23,6 +23,7 @@  #ifndef __XS_SOCKET_BASE_HPP_INCLUDED__  #define __XS_SOCKET_BASE_HPP_INCLUDED__ +#include <map>  #include <string>  #include "own.hpp" @@ -71,6 +72,7 @@ namespace xs          int getsockopt (int option_, void *optval_, size_t *optvallen_);          int bind (const char *addr_);          int connect (const char *addr_); +        int shutdown (int how_);          int send (xs::msg_t *msg_, int flags_);          int recv (xs::msg_t *msg_, int flags_);          int close (); @@ -151,6 +153,9 @@ namespace xs          //  to be later retrieved by getsockopt.          void extract_flags (msg_t *msg_); +        //  Creates new endpoint ID and adds the endpoint to the map. +        int add_endpoint (own_t *endpoint_); +          //  Used to check whether the object is a socket.          uint32_t tag; @@ -209,6 +214,10 @@ namespace xs          //  Improves efficiency of time measurement.          clock_t clock; +        //   Map of open endpoints. +        typedef std::map <int, own_t*> endpoints_t; +        endpoints_t endpoints; +          socket_base_t (const socket_base_t&);          const socket_base_t &operator = (const socket_base_t&);      }; @@ -212,6 +212,17 @@ int xs_connect (void *s_, const char *addr_)      return rc;  } +int xs_shutdown (void *s_, int how_) +{ +    xs::socket_base_t *s = (xs::socket_base_t*) s_; +    if (!s || !s->check_tag ()) { +        errno = ENOTSOCK; +        return -1; +    } +    int rc = s->shutdown (how_); +    return rc;      +} +  int xs_send (void *s_, const void *buf_, size_t len_, int flags_)  {      xs_msg_t msg; diff --git a/src/xszmq.cpp b/src/xszmq.cpp index d7199f9..f6e6fa1 100644 --- a/src/xszmq.cpp +++ b/src/xszmq.cpp @@ -355,12 +355,14 @@ int zmq_getsockopt (void *s, int option, void *optval,  int zmq_bind (void *s, const char *addr)  { -    return xs_bind (s, addr); +    int rc = xs_bind (s, addr); +    return rc < 0 ? -1 : 0;  }  int zmq_connect (void *s, const char *addr)  { -    return xs_connect (s, addr); +    int rc = xs_connect (s, addr); +    return rc < 0 ? -1 : 0;  }  int zmq_send (void *s, zmq_msg_t *msg, int flags) diff --git a/tests/Makefile.am b/tests/Makefile.am index ba76260..47a880b 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -24,7 +24,8 @@ noinst_PROGRAMS = pair_inproc \                    wireformat \                    libzmq21 \                    resubscribe \ -                  survey +                  survey \ +                  shutdown  pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp  pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp @@ -48,5 +49,6 @@ wireformat_SOURCES = wireformat.cpp  libzmq21_SOURCES = libzmq21.cpp  resubscribe_SOURCES = resubscribe.cpp  survey_SOURCES = survey.cpp +shutdown_SOURCES = shutdown.cpp  TESTS = $(noinst_PROGRAMS) diff --git a/tests/hwm.cpp b/tests/hwm.cpp index 6cd9a41..93e8910 100644 --- a/tests/hwm.cpp +++ b/tests/hwm.cpp @@ -35,14 +35,14 @@ int XS_TEST_MAIN ()      int rc = xs_setsockopt (sb, XS_RCVHWM, &hwm, sizeof (hwm));      assert (rc == 0); | 
