summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:08:43 +0900
committerMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:08:43 +0900
commit1e01248efc113cc9389f795157400a634730823e (patch)
tree5b321c0540001b6c1f7eab659e4cefec67dfcf96 /src
parenta55458399f9e54f8384eda174d405ee85d490c45 (diff)
XS_CTX_REENTRANT option added
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/ctx.cpp21
-rw-r--r--src/ctx.hpp6
-rw-r--r--src/mutex.hpp52
-rw-r--r--src/socket_base.cpp140
-rw-r--r--src/socket_base.hpp4
5 files changed, 181 insertions, 42 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index b6298e2..917c3e6 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -43,7 +43,8 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) :
starting (true),
terminating (false),
max_sockets (512),
- io_thread_count (io_threads_)
+ io_thread_count (io_threads_),
+ reentrant (false)
{
}
@@ -147,6 +148,16 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
max_sockets = *((int*) optval_);
opt_sync.unlock ();
break;
+ case XS_CTX_REENTRANT:
+ if (optvallen_ != sizeof (int) || (*((int*) optval_) != 0 &&
+ *((int*) optval_) != 1)) {
+ errno = EINVAL;
+ return -1;
+ }
+ opt_sync.lock ();
+ reentrant = (*((int*) optval_) ? true : false);
+ opt_sync.unlock ();
+ break;
default:
errno = EINVAL;
return -1;
@@ -379,6 +390,14 @@ void xs::ctx_t::publish_logs (const char *text_)
log_sync.unlock ();
}
+bool xs::ctx_t::is_reentrant ()
+{
+ opt_sync.lock ();
+ bool ret = reentrant;
+ opt_sync.unlock ();
+ return ret;
+}
+
// The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have
// unique IDs.
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 56b5d4c..e912443 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -100,6 +100,9 @@ namespace xs
void log (int sid_, const char *text_);
void publish_logs (const char *text_);
+ // True, if API is expected to be reentrant.
+ bool is_reentrant ();
+
enum {
term_tid = 0,
reaper_tid = 1
@@ -173,6 +176,9 @@ namespace xs
// Number of I/O threads to launch.
uint32_t io_thread_count;
+ // True, if API is expected to be reentrant.
+ bool reentrant;
+
// Synchronisation of access to context options.
mutex_t opt_sync;
diff --git a/src/mutex.hpp b/src/mutex.hpp
index 118b5ef..fc75bed 100644
--- a/src/mutex.hpp
+++ b/src/mutex.hpp
@@ -37,28 +37,34 @@ namespace xs
class mutex_t
{
public:
- inline mutex_t ()
+ inline mutex_t (bool fake_ = false) :
+ fake (fake_)
{
- InitializeCriticalSection (&cs);
+ if (!fake)
+ InitializeCriticalSection (&cs);
}
inline ~mutex_t ()
{
- DeleteCriticalSection (&cs);
+ if (!fake)
+ DeleteCriticalSection (&cs);
}
inline void lock ()
{
- EnterCriticalSection (&cs);
+ if (!fake)
+ EnterCriticalSection (&cs);
}
inline void unlock ()
{
- LeaveCriticalSection (&cs);
+ if (!fake)
+ LeaveCriticalSection (&cs);
}
private:
+ bool fake;
CRITICAL_SECTION cs;
// Disable copy construction and assignment.
@@ -78,36 +84,46 @@ namespace xs
class mutex_t
{
public:
- inline mutex_t ()
+ inline mutex_t (bool fake_ = false) :
+ fake (fake_)
{
- int rc = pthread_mutex_init (&mutex, NULL);
- if (rc)
- posix_assert (rc);
+ if (!fake) {
+ int rc = pthread_mutex_init (&mutex, NULL);
+ if (rc)
+ posix_assert (rc);
+ }
}
inline ~mutex_t ()
{
- int rc = pthread_mutex_destroy (&mutex);
- if (rc)
- posix_assert (rc);
+ if (!fake) {
+ int rc = pthread_mutex_destroy (&mutex);
+ if (rc)
+ posix_assert (rc);
+ }
}
inline void lock ()
{
- int rc = pthread_mutex_lock (&mutex);
- if (rc)
- posix_assert (rc);
+ if (!fake) {
+ int rc = pthread_mutex_lock (&mutex);
+ if (rc)
+ posix_assert (rc);
+ }
}
inline void unlock ()
{
- int rc = pthread_mutex_unlock (&mutex);
- if (rc)
- posix_assert (rc);
+ if (!fake) {
+ int rc = pthread_mutex_unlock (&mutex);
+ if (rc)
+ posix_assert (rc);
+ }
}
private:
+ bool fake;
pthread_mutex_t mutex;
// Disable copy construction and assignment.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index f3ae291..718263f 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -117,6 +117,7 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,
xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
own_t (parent_, tid_),
tag (0xbaddecaf),
+ sync (!parent_->is_reentrant ()),
ctx_terminated (false),
destroyed (false),
last_tsc (0),
@@ -226,57 +227,74 @@ void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_)
int xs::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
+ sync.lock ();
+
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
// First, check whether specific socket type overloads the option.
int rc = xsetsockopt (option_, optval_, optvallen_);
- if (rc == 0 || errno != EINVAL)
+ if (rc == 0 || errno != EINVAL) {
+ sync.unlock ();
return rc;
+ }
// If the socket type doesn't support the option, pass it to
// the generic option parser.
- return options.setsockopt (option_, optval_, optvallen_);
+ rc = options.setsockopt (option_, optval_, optvallen_);
+ sync.unlock ();
+ return rc;
}
int xs::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_)
{
+ sync.lock ();
+
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
if (option_ == XS_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
+ sync.unlock ();
errno = EINVAL;
return -1;
}
*((int*) optval_) = rcvmore ? 1 : 0;
*optvallen_ = sizeof (int);
+ sync.unlock ();
return 0;
}
if (option_ == XS_FD) {
if (*optvallen_ < sizeof (fd_t)) {
errno = EINVAL;
+ sync.unlock ();
return -1;
}
*((fd_t*) optval_) = mailbox.get_fd ();
*optvallen_ = sizeof (fd_t);
+ sync.unlock ();
return 0;
}
if (option_ == XS_EVENTS) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
+ sync.unlock ();
return -1;
}
int rc = process_commands (0, false);
- if (rc != 0 && (errno == EINTR || errno == ETERM))
+ if (rc != 0 && (errno == EINTR || errno == ETERM)) {
+ sync.unlock ();
return -1;
+ }
errno_assert (rc == 0);
*((int*) optval_) = 0;
if (has_out ())
@@ -284,6 +302,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,
if (has_in ())
*((int*) optval_) |= XS_POLLIN;
*optvallen_ = sizeof (int);
+ sync.unlock ();
return 0;
}
@@ -292,7 +311,10 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,
int xs::socket_base_t::bind (const char *addr_)
{
+ sync.lock ();
+
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
@@ -301,29 +323,38 @@ int xs::socket_base_t::bind (const char *addr_)
std::string protocol;
std::string address;
int rc = parse_uri (addr_, protocol, address);
- if (rc != 0)
+ if (rc != 0) {
+ sync.unlock ();
return -1;
+ }
rc = check_protocol (protocol);
- if (rc != 0)
+ if (rc != 0) {
+ sync.unlock ();
return -1;
+ }
if (protocol == "inproc") {
endpoint_t endpoint = {this, options};
- return register_endpoint (addr_, endpoint);
+ rc = register_endpoint (addr_, endpoint);
+ sync.unlock ();
+ return rc;
}
if (protocol == "pgm" || protocol == "epgm") {
// For convenience's sake, bind can be used interchageable with
// connect for PGM and EPGM transports.
- return connect (addr_);
+ rc = connect (addr_);
+ sync.unlock ();
+ return rc;
}
// Remaining trasnports require to be run in an I/O thread, so at this
// point we'll choose one.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
+ sync.unlock ();
errno = EMTHREAD;
return -1;
}
@@ -335,9 +366,11 @@ int xs::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
+ sync.unlock ();
return -1;
}
launch_child (listener);
+ sync.unlock ();
return 0;
}
@@ -349,9 +382,11 @@ int xs::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
+ sync.unlock ();
return -1;
}
launch_child (listener);
+ sync.unlock ();
return 0;
}
#endif
@@ -362,7 +397,10 @@ int xs::socket_base_t::bind (const char *addr_)
int xs::socket_base_t::connect (const char *addr_)
{
+ sync.lock ();
+
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
@@ -371,12 +409,16 @@ int xs::socket_base_t::connect (const char *addr_)
std::string protocol;
std::string address;
int rc = parse_uri (addr_, protocol, address);
- if (rc != 0)
+ if (rc != 0) {
+ sync.unlock ();
return -1;
+ }
rc = check_protocol (protocol);
- if (rc != 0)
+ if (rc != 0) {
+ sync.unlock ();
return -1;
+ }
if (protocol == "inproc") {
@@ -386,8 +428,10 @@ int xs::socket_base_t::connect (const char *addr_)
// Find the peer endpoint.
endpoint_t peer = find_endpoint (addr_);
- if (!peer.socket)
+ if (!peer.socket) {
+ sync.unlock ();
return -1;
+ }
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
@@ -429,12 +473,14 @@ int xs::socket_base_t::connect (const char *addr_)
// increased here.
send_bind (peer.socket, pipes [1], false);
+ sync.unlock ();
return 0;
}
// Choose the I/O thread to run the session in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
+ sync.unlock ();
errno = EMTHREAD;
return -1;
}
@@ -467,27 +513,34 @@ int xs::socket_base_t::connect (const char *addr_)
// Activate the session. Make it a child of this socket.
launch_child (session);
+ sync.unlock ();
return 0;
}
int xs::socket_base_t::send (msg_t *msg_, int flags_)
{
+ sync.lock ();
+
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
// Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) {
+ sync.unlock ();
errno = EFAULT;
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, true);
- if (unlikely (rc != 0))
+ if (unlikely (rc != 0)) {
+ sync.unlock ();
return -1;
+ }
// Clear any user-visible flags that are set on the message.
msg_->reset_flags (msg_t::more);
@@ -498,15 +551,21 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)
// Try to send the message.
rc = xsend (msg_, flags_);
- if (rc == 0)
+ if (rc == 0) {
+ sync.unlock ();
return 0;
- if (unlikely (errno != EAGAIN))
+ }
+ if (unlikely (errno != EAGAIN)) {
+ sync.unlock ();
return -1;
+ }
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - up the stack.
- if (flags_ & XS_DONTWAIT || options.sndtimeo == 0)
+ if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) {
+ sync.unlock ();
return -1;
+ }
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
@@ -518,42 +577,55 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)
// command, process it and try to send the message again.
// If timeout is reached in the meantime, return EAGAIN.
while (true) {
- if (unlikely (process_commands (timeout, false) != 0))
+ if (unlikely (process_commands (timeout, false) != 0)) {
+ sync.unlock ();
return -1;
+ }
rc = xsend (msg_, flags_);
if (rc == 0)
break;
- if (unlikely (errno != EAGAIN))
+ if (unlikely (errno != EAGAIN)) {
+ sync.unlock ();
return -1;
+ }
if (timeout > 0) {
timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) {
+ sync.unlock ();
errno = EAGAIN;
return -1;
}
}
}
+
+ sync.unlock ();
return 0;
}
int xs::socket_base_t::recv (msg_t *msg_, int flags_)
{
+ sync.lock ();
+
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
// Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) {
+ sync.unlock ();
errno = EFAULT;
return -1;
}
// Get the message.
int rc = xrecv (msg_, flags_);
- if (unlikely (rc != 0 && errno != EAGAIN))
+ if (unlikely (rc != 0 && errno != EAGAIN)) {
+ sync.unlock ();
return -1;
+ }
// Once every inbound_poll_rate messages check for signals and process
// incoming commands. This happens only if we are not polling altogether
@@ -572,6 +644,7 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately.
if (rc == 0) {
extract_flags (msg_);
+ sync.unlock ();
return 0;
}
@@ -580,14 +653,20 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN.
if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) {
- if (unlikely (process_commands (0, false) != 0))
+ if (unlikely (process_commands (0, false) != 0)) {
+ sync.unlock ();
return -1;
+ }
ticks = 0;
rc = xrecv (msg_, flags_);
- if (rc < 0)
+ if (rc < 0) {
+ sync.unlock ();
return rc;
+ }
+
extract_flags (msg_);
+ sync.unlock ();
return 0;
}
@@ -601,19 +680,24 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// we are able to fetch a message.
bool block = (ticks != 0);
while (true) {
- if (unlikely (process_commands (block ? timeout : 0, false) != 0))
+ if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
+ sync.unlock ();
return -1;
+ }
rc = xrecv (msg_, flags_);
if (rc == 0) {
ticks = 0;
break;
}
- if (unlikely (errno != EAGAIN))
+ if (unlikely (errno != EAGAIN)) {
+ sync.unlock ();
return -1;
+ }
block = true;
if (timeout > 0) {
timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) {
+ sync.unlock ();
errno = EAGAIN;
return -1;
}
@@ -621,11 +705,14 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
}
extract_flags (msg_);
+ sync.unlock ();
return 0;
}
int xs::socket_base_t::close ()
{
+ sync.lock ();
+
// Mark the socket as dead.
tag = 0xdeadbeef;
@@ -634,17 +721,24 @@ int xs::socket_base_t::close ()
// process.
send_reap (this);
+ sync.unlock ();
return 0;
}
bool xs::socket_base_t::has_in ()
{
- return xhas_in ();
+ sync.lock ();
+ bool ret = xhas_in ();
+ sync.unlock ();
+ return ret;
}
bool xs::socket_base_t::has_out ()
{
- return xhas_out ();
+ sync.lock ();
+ bool ret = xhas_out ();
+ sync.unlock ();
+ return ret;
}
void xs::socket_base_t::start_reaping (poller_base_t *poller_)
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 1e35ffa..850586e 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -141,6 +141,10 @@ namespace xs
// Used to check whether the object is a socket.
uint32_t tag;
+ // Synchronisation of access to the socket. If Crossroads are running
+ // in non-reentrant mode, it is a dummy mutex-like object.
+ mutex_t sync;
+
// If true, associated context was already terminated.
bool ctx_terminated;