summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-21 07:07:57 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-21 07:07:57 +0200
commit36fd87810274329c8cd86344b95a0521541e7bab (patch)
treeec183c7dd3a9b1de3361e7211cbffc960d139cf1 /src
parentd26e86aa0afc3ec2534eacb4131aee8f6805c36a (diff)
xs_shutdown implemented
This patch allows for partial shutdown of the socket. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/own.hpp12
-rw-r--r--src/socket_base.cpp52
-rw-r--r--src/socket_base.hpp9
-rw-r--r--src/xs.cpp11
-rw-r--r--src/xszmq.cpp6
5 files changed, 79 insertions, 11 deletions
diff --git a/src/own.hpp b/src/own.hpp
index a2f0e9f..2d47a39 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -67,6 +67,12 @@ namespace xs
protected:
+ // Handlers for incoming commands.
+ void process_own (own_t *object_);
+ void process_term_req (own_t *object_);
+ void process_term_ack ();
+ void process_seqnum ();
+
// Launch the supplied object and become its owner.
void launch_child (own_t *object_);
@@ -101,12 +107,6 @@ namespace xs
// Set owner of the object
void set_owner (own_t *owner_);
- // Handlers for incoming commands.
- void process_own (own_t *object_);
- void process_term_req (own_t *object_);
- void process_term_ack ();
- void process_seqnum ();
-
// Check whether all the peding term acks were delivered.
// If so, deallocate this object.
void check_term_acks ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 3e57c28..ec5c8bd 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -347,7 +347,12 @@ int xs::socket_base_t::bind (const char *addr_)
if (protocol == "inproc") {
endpoint_t endpoint = {this, options};
rc = register_endpoint (addr_, endpoint);
- return rc;
+ if (rc != 0)
+ return -1;
+
+ // Endpoint IDs for inproc transport are not implemented at the
+ // moment. Thus we return 0 to the user.
+ return 0;
}
if (protocol == "pgm" || protocol == "epgm") {
@@ -373,7 +378,7 @@ int xs::socket_base_t::bind (const char *addr_)
return -1;
}
launch_child (listener);
- return 0;
+ return add_endpoint (listener);
}
#if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS
@@ -387,7 +392,7 @@ int xs::socket_base_t::bind (const char *addr_)
return -1;
}
launch_child (listener);
- return 0;
+ return add_endpoint (listener);
}
#endif
@@ -478,6 +483,7 @@ int xs::socket_base_t::connect (const char *addr_)
// increased here.
send_bind (peer.socket, ppair [1], false);
+ // Inproc endpoints are not yet implemented thus we return 0.
return 0;
}
@@ -512,7 +518,32 @@ int xs::socket_base_t::connect (const char *addr_)
// Activate the session. Make it a child of this socket.
launch_child (session);
+ return add_endpoint (session);
+}
+
+int xs::socket_base_t::shutdown (int how_)
+{
+ // Check whether the library haven't been shut down yet.
+ if (unlikely (ctx_terminated)) {
+ errno = ETERM;
+ return -1;
+ }
+ // Endpoint ID means 'shutdown not implemented'.
+ if (how_ <= 0) {
+ errno = ENOTSUP;
+ return -1;
+ }
+
+ // Find the endpoint corresponding to the ID.
+ endpoints_t::iterator it = endpoints.find (how_);
+ if (it == endpoints.end ()) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ process_term_req (it->second);
+ endpoints.erase (it);
return 0;
}
@@ -937,3 +968,18 @@ uint64_t xs::socket_base_t::now_ms ()
{
return clock.now_ms ();
}
+
+int xs::socket_base_t::add_endpoint (own_t *endpoint_)
+{
+ // Get a unique endpoint ID.
+ int id = 1;
+ for (endpoints_t::iterator it = endpoints.begin (); it != endpoints.end ();
+ ++it, ++id)
+ if (it->first != id)
+ break;
+
+ // Remember the endpoint.
+ endpoints.insert (std::make_pair (id, endpoint_));
+ return id;
+}
+
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 54a5f0a..f73c413 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -23,6 +23,7 @@
#ifndef __XS_SOCKET_BASE_HPP_INCLUDED__
#define __XS_SOCKET_BASE_HPP_INCLUDED__
+#include <map>
#include <string>
#include "own.hpp"
@@ -71,6 +72,7 @@ namespace xs
int getsockopt (int option_, void *optval_, size_t *optvallen_);
int bind (const char *addr_);
int connect (const char *addr_);
+ int shutdown (int how_);
int send (xs::msg_t *msg_, int flags_);
int recv (xs::msg_t *msg_, int flags_);
int close ();
@@ -151,6 +153,9 @@ namespace xs
// to be later retrieved by getsockopt.
void extract_flags (msg_t *msg_);
+ // Creates new endpoint ID and adds the endpoint to the map.
+ int add_endpoint (own_t *endpoint_);
+
// Used to check whether the object is a socket.
uint32_t tag;
@@ -209,6 +214,10 @@ namespace xs
// Improves efficiency of time measurement.
clock_t clock;
+ // Map of open endpoints.
+ typedef std::map <int, own_t*> endpoints_t;
+ endpoints_t endpoints;
+
socket_base_t (const socket_base_t&);
const socket_base_t &operator = (const socket_base_t&);
};
diff --git a/src/xs.cpp b/src/xs.cpp
index 7f4cdd2..81daded 100644
--- a/src/xs.cpp
+++ b/src/xs.cpp
@@ -212,6 +212,17 @@ int xs_connect (void *s_, const char *addr_)
return rc;
}
+int xs_shutdown (void *s_, int how_)
+{
+ xs::socket_base_t *s = (xs::socket_base_t*) s_;
+ if (!s || !s->check_tag ()) {
+ errno = ENOTSOCK;
+ return -1;
+ }
+ int rc = s->shutdown (how_);
+ return rc;
+}
+
int xs_send (void *s_, const void *buf_, size_t len_, int flags_)
{
xs_msg_t msg;
diff --git a/src/xszmq.cpp b/src/xszmq.cpp
index d7199f9..f6e6fa1 100644
--- a/src/xszmq.cpp
+++ b/src/xszmq.cpp
@@ -355,12 +355,14 @@ int zmq_getsockopt (void *s, int option, void *optval,
int zmq_bind (void *s, const char *addr)
{
- return xs_bind (s, addr);
+ int rc = xs_bind (s, addr);
+ return rc < 0 ? -1 : 0;
}
int zmq_connect (void *s, const char *addr)
{
- return xs_connect (s, addr);
+ int rc = xs_connect (s, addr);
+ return rc < 0 ? -1 : 0;
}
int zmq_send (void *s, zmq_msg_t *msg, int flags)