From 36fd87810274329c8cd86344b95a0521541e7bab Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 21 Apr 2012 07:07:57 +0200 Subject: xs_shutdown implemented This patch allows for partial shutdown of the socket. Signed-off-by: Martin Sustrik --- src/own.hpp | 12 ++++++------ src/socket_base.cpp | 52 +++++++++++++++++++++++++++++++++++++++++++++++++--- src/socket_base.hpp | 9 +++++++++ src/xs.cpp | 11 +++++++++++ src/xszmq.cpp | 6 ++++-- 5 files changed, 79 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/own.hpp b/src/own.hpp index a2f0e9f..2d47a39 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -67,6 +67,12 @@ namespace xs protected: + // Handlers for incoming commands. + void process_own (own_t *object_); + void process_term_req (own_t *object_); + void process_term_ack (); + void process_seqnum (); + // Launch the supplied object and become its owner. void launch_child (own_t *object_); @@ -101,12 +107,6 @@ namespace xs // Set owner of the object void set_owner (own_t *owner_); - // Handlers for incoming commands. - void process_own (own_t *object_); - void process_term_req (own_t *object_); - void process_term_ack (); - void process_seqnum (); - // Check whether all the peding term acks were delivered. // If so, deallocate this object. void check_term_acks (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 3e57c28..ec5c8bd 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -347,7 +347,12 @@ int xs::socket_base_t::bind (const char *addr_) if (protocol == "inproc") { endpoint_t endpoint = {this, options}; rc = register_endpoint (addr_, endpoint); - return rc; + if (rc != 0) + return -1; + + // Endpoint IDs for inproc transport are not implemented at the + // moment. Thus we return 0 to the user. + return 0; } if (protocol == "pgm" || protocol == "epgm") { @@ -373,7 +378,7 @@ int xs::socket_base_t::bind (const char *addr_) return -1; } launch_child (listener); - return 0; + return add_endpoint (listener); } #if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS @@ -387,7 +392,7 @@ int xs::socket_base_t::bind (const char *addr_) return -1; } launch_child (listener); - return 0; + return add_endpoint (listener); } #endif @@ -478,6 +483,7 @@ int xs::socket_base_t::connect (const char *addr_) // increased here. send_bind (peer.socket, ppair [1], false); + // Inproc endpoints are not yet implemented thus we return 0. return 0; } @@ -512,7 +518,32 @@ int xs::socket_base_t::connect (const char *addr_) // Activate the session. Make it a child of this socket. launch_child (session); + return add_endpoint (session); +} + +int xs::socket_base_t::shutdown (int how_) +{ + // Check whether the library haven't been shut down yet. + if (unlikely (ctx_terminated)) { + errno = ETERM; + return -1; + } + // Endpoint ID means 'shutdown not implemented'. + if (how_ <= 0) { + errno = ENOTSUP; + return -1; + } + + // Find the endpoint corresponding to the ID. + endpoints_t::iterator it = endpoints.find (how_); + if (it == endpoints.end ()) { + errno = EINVAL; + return -1; + } + + process_term_req (it->second); + endpoints.erase (it); return 0; } @@ -937,3 +968,18 @@ uint64_t xs::socket_base_t::now_ms () { return clock.now_ms (); } + +int xs::socket_base_t::add_endpoint (own_t *endpoint_) +{ + // Get a unique endpoint ID. + int id = 1; + for (endpoints_t::iterator it = endpoints.begin (); it != endpoints.end (); + ++it, ++id) + if (it->first != id) + break; + + // Remember the endpoint. + endpoints.insert (std::make_pair (id, endpoint_)); + return id; +} + diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 54a5f0a..f73c413 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -23,6 +23,7 @@ #ifndef __XS_SOCKET_BASE_HPP_INCLUDED__ #define __XS_SOCKET_BASE_HPP_INCLUDED__ +#include #include #include "own.hpp" @@ -71,6 +72,7 @@ namespace xs int getsockopt (int option_, void *optval_, size_t *optvallen_); int bind (const char *addr_); int connect (const char *addr_); + int shutdown (int how_); int send (xs::msg_t *msg_, int flags_); int recv (xs::msg_t *msg_, int flags_); int close (); @@ -151,6 +153,9 @@ namespace xs // to be later retrieved by getsockopt. void extract_flags (msg_t *msg_); + // Creates new endpoint ID and adds the endpoint to the map. + int add_endpoint (own_t *endpoint_); + // Used to check whether the object is a socket. uint32_t tag; @@ -209,6 +214,10 @@ namespace xs // Improves efficiency of time measurement. clock_t clock; + // Map of open endpoints. + typedef std::map endpoints_t; + endpoints_t endpoints; + socket_base_t (const socket_base_t&); const socket_base_t &operator = (const socket_base_t&); }; diff --git a/src/xs.cpp b/src/xs.cpp index 7f4cdd2..81daded 100644 --- a/src/xs.cpp +++ b/src/xs.cpp @@ -212,6 +212,17 @@ int xs_connect (void *s_, const char *addr_) return rc; } +int xs_shutdown (void *s_, int how_) +{ + xs::socket_base_t *s = (xs::socket_base_t*) s_; + if (!s || !s->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + int rc = s->shutdown (how_); + return rc; +} + int xs_send (void *s_, const void *buf_, size_t len_, int flags_) { xs_msg_t msg; diff --git a/src/xszmq.cpp b/src/xszmq.cpp index d7199f9..f6e6fa1 100644 --- a/src/xszmq.cpp +++ b/src/xszmq.cpp @@ -355,12 +355,14 @@ int zmq_getsockopt (void *s, int option, void *optval, int zmq_bind (void *s, const char *addr) { - return xs_bind (s, addr); + int rc = xs_bind (s, addr); + return rc < 0 ? -1 : 0; } int zmq_connect (void *s, const char *addr) { - return xs_connect (s, addr); + int rc = xs_connect (s, addr); + return rc < 0 ? -1 : 0; } int zmq_send (void *s, zmq_msg_t *msg, int flags) -- cgit v1.2.3