summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--doc/xs_setctxopt.txt16
-rw-r--r--include/xs.h1
-rw-r--r--src/ctx.cpp21
-rw-r--r--src/ctx.hpp6
-rw-r--r--src/mutex.hpp52
-rw-r--r--src/socket_base.cpp140
-rw-r--r--src/socket_base.hpp4
-rw-r--r--tests/Makefile.am4
-rw-r--r--tests/reentrant.cpp53
10 files changed, 255 insertions, 43 deletions
diff --git a/.gitignore b/.gitignore
index 4450075..83b6bdb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -37,6 +37,7 @@ tests/msg_flags
tests/reconnect
tests/linger
tests/max_sockets
+tests/reentrant
src/platform.hpp*
src/stamp-h1
perf/local_lat
diff --git a/doc/xs_setctxopt.txt b/doc/xs_setctxopt.txt
index be9e654..bc7d018 100644
--- a/doc/xs_setctxopt.txt
+++ b/doc/xs_setctxopt.txt
@@ -34,6 +34,22 @@ Option value type:: int
Option value unit:: sockets
Default value:: 512
+XS_CTX_REENTRANT: Specify whether sockets should be thread-safe
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+If 'XS_CTX_REENTRANT' option is set to 1 it is safe to access single Crossroads
+socket from multiple threads in parallel. If it is set to 0 it can be accessed
+by at most one thread at any single point of time.
+
+Note: By default Crossroads sockets are non-reentrant. If possible, try to use
+the socket from the thread it was created in. If communication between threads
+is needed use inproc transport. Not following this advice can introduce
+scalability problems.
+
+[horizontal]
+Option value type:: int
+Option value unit:: boolean
+Default value:: 0
+
RETURN VALUE
------------
The _xs_setctxopt()_ function shall return zero if successful. Otherwise it
diff --git a/include/xs.h b/include/xs.h
index 3aa1045..4d0be57 100644
--- a/include/xs.h
+++ b/include/xs.h
@@ -147,6 +147,7 @@ XS_EXPORT int xs_getmsgopt (xs_msg_t *msg, int option, void *optval,
/******************************************************************************/
#define XS_CTX_MAX_SOCKETS 1
+#define XS_CTX_REENTRANT 2
XS_EXPORT void *xs_init (int io_threads);
XS_EXPORT int xs_term (void *context);
diff --git a/src/ctx.cpp b/src/ctx.cpp
index b6298e2..917c3e6 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -43,7 +43,8 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) :
starting (true),
terminating (false),
max_sockets (512),
- io_thread_count (io_threads_)
+ io_thread_count (io_threads_),
+ reentrant (false)
{
}
@@ -147,6 +148,16 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
max_sockets = *((int*) optval_);
opt_sync.unlock ();
break;
+ case XS_CTX_REENTRANT:
+ if (optvallen_ != sizeof (int) || (*((int*) optval_) != 0 &&
+ *((int*) optval_) != 1)) {
+ errno = EINVAL;
+ return -1;
+ }
+ opt_sync.lock ();
+ reentrant = (*((int*) optval_) ? true : false);
+ opt_sync.unlock ();
+ break;
default:
errno = EINVAL;
return -1;
@@ -379,6 +390,14 @@ void xs::ctx_t::publish_logs (const char *text_)
log_sync.unlock ();
}
+bool xs::ctx_t::is_reentrant ()
+{
+ opt_sync.lock ();
+ bool ret = reentrant;
+ opt_sync.unlock ();
+ return ret;
+}
+
// The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have
// unique IDs.
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 56b5d4c..e912443 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -100,6 +100,9 @@ namespace xs
void log (int sid_, const char *text_);
void publish_logs (const char *text_);
+ // True, if API is expected to be reentrant.
+ bool is_reentrant ();
+
enum {
term_tid = 0,
reaper_tid = 1
@@ -173,6 +176,9 @@ namespace xs
// Number of I/O threads to launch.
uint32_t io_thread_count;
+ // True, if API is expected to be reentrant.
+ bool reentrant;
+
// Synchronisation of access to context options.
mutex_t opt_sync;
diff --git a/src/mutex.hpp b/src/mutex.hpp
index 118b5ef..fc75bed 100644
--- a/src/mutex.hpp
+++ b/src/mutex.hpp
@@ -37,28 +37,34 @@ namespace xs
class mutex_t
{
public:
- inline mutex_t ()
+ inline mutex_t (bool fake_ = false) :
+ fake (fake_)
{
- InitializeCriticalSection (&cs);
+ if (!fake)
+ InitializeCriticalSection (&cs);
}
inline ~mutex_t ()
{
- DeleteCriticalSection (&cs);
+ if (!fake)
+ DeleteCriticalSection (&cs);
}
inline void lock ()
{
- EnterCriticalSection (&cs);
+ if (!fake)
+ EnterCriticalSection (&cs);
}
inline void unlock ()
{
- LeaveCriticalSection (&cs);
+ if (!fake)
+ LeaveCriticalSection (&cs);
}
private:
+ bool fake;
CRITICAL_SECTION cs;
// Disable copy construction and assignment.
@@ -78,36 +84,46 @@ namespace xs
class mutex_t
{
public:
- inline mutex_t ()
+ inline mutex_t (bool fake_ = false) :
+ fake (fake_)
{
- int rc = pthread_mutex_init (&mutex, NULL);
- if (rc)
- posix_assert (rc);
+ if (!fake) {
+ int rc = pthread_mutex_init (&mutex, NULL);
+ if (rc)
+ posix_assert (rc);
+ }
}
inline ~mutex_t ()
{
- int rc = pthread_mutex_destroy (&mutex);
- if (rc)
- posix_assert (rc);
+ if (!fake) {
+ int rc = pthread_mutex_destroy (&mutex);
+ if (rc)
+ posix_assert (rc);
+ }
}
inline void lock ()
{
- int rc = pthread_mutex_lock (&mutex);
- if (rc)
- posix_assert (rc);
+ if (!fake) {
+ int rc = pthread_mutex_lock (&mutex);
+ if (rc)
+ posix_assert (rc);
+ }
}
inline void unlock ()
{
- int rc = pthread_mutex_unlock (&mutex);
- if (rc)
- posix_assert (rc);
+ if (!fake) {
+ int rc = pthread_mutex_unlock (&mutex);
+ if (rc)
+ posix_assert (rc);
+ }
}
private:
+ bool fake;
pthread_mutex_t mutex;
// Disable copy construction and assignment.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index f3ae291..718263f 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -117,6 +117,7 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,
xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
own_t (parent_, tid_),
tag (0xbaddecaf),
+ sync (!parent_->is_reentrant ()),
ctx_terminated (false),
destroyed (false),
last_tsc (0),
@@ -226,57 +227,74 @@ void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_)
int xs::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
+ sync.lock ();
+
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
// First, check whether specific socket type overloads the option.
int rc = xsetsockopt (option_, optval_, optvallen_);
- if (rc == 0 || errno != EINVAL)
+ if (rc == 0 || errno != EINVAL) {
+ sync.unlock ();
return rc;
+ }
// If the socket type doesn't support the option, pass it to
// the generic option parser.
- return options.setsockopt (option_, optval_, optvallen_);
+ rc = options.setsockopt (option_, optval_, optvallen_);
+ sync.unlock ();
+ return rc;
}
int xs::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_)
{
+ sync.lock ();
+
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
if (option_ == XS_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
+ sync.unlock ();
errno = EINVAL;
return -1;
}
*((int*) optval_) = rcvmore ? 1 : 0;
*optvallen_ = sizeof (int);
+ sync.unlock ();
return 0;
}
if (option_ == XS_FD) {
if (*optvallen_ < sizeof (fd_t)) {
errno = EINVAL;
+ sync.unlock ();
return -1;
}
*((fd_t*) optval_) = mailbox.get_fd ();
*optvallen_ = sizeof (fd_t);
+ sync.unlock ();
return 0;
}
if (option_ == XS_EVENTS) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
+ sync.unlock ();
return -1;
}
int rc = process_commands (0, false);
- if (rc != 0 && (errno == EINTR || errno == ETERM))
+ if (rc != 0 && (errno == EINTR || errno == ETERM)) {
+ sync.unlock ();
return -1;
+ }
errno_assert (rc == 0);
*((int*) optval_) = 0;
if (has_out ())
@@ -284,6 +302,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,
if (has_in ())
*((int*) optval_) |= XS_POLLIN;
*optvallen_ = sizeof (int);
+ sync.unlock ();
return 0;
}
@@ -292,7 +311,10 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,
int xs::socket_base_t::bind (const char *addr_)
{
+ sync.lock ();
+
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
@@ -301,29 +323,38 @@ int xs::socket_base_t::bind (const char *addr_)
std::string protocol;
std::string address;
int rc = parse_uri (addr_, protocol, address);
- if (rc != 0)
+ if (rc != 0) {
+ sync.unlock ();
return -1;
+ }
rc = check_protocol (protocol);
- if (rc != 0)
+ if (rc != 0) {
+ sync.unlock ();
return -1;
+ }
if (protocol == "inproc") {
endpoint_t endpoint = {this, options};
- return register_endpoint (addr_, endpoint);
+ rc = register_endpoint (addr_, endpoint);
+ sync.unlock ();
+ return rc;
}
if (protocol == "pgm" || protocol == "epgm") {
// For convenience's sake, bind can be used interchageable with
// connect for PGM and EPGM transports.
- return connect (addr_);
+ rc = connect (addr_);
+ sync.unlock ();
+ return rc;
}
// Remaining trasnports require to be run in an I/O thread, so at this
// point we'll choose one.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
+ sync.unlock ();
errno = EMTHREAD;
return -1;
}
@@ -335,9 +366,11 @@ int xs::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
+ sync.unlock ();
return -1;
}
launch_child (listener);
+ sync.unlock ();
return 0;
}
@@ -349,9 +382,11 @@ int xs::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
+ sync.unlock ();
return -1;
}
launch_child (listener);
+ sync.unlock ();
return 0;
}
#endif
@@ -362,7 +397,10 @@ int xs::socket_base_t::bind (const char *addr_)
int xs::socket_base_t::connect (const char *addr_)
{
+ sync.lock ();
+
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
@@ -371,12 +409,16 @@ int xs::socket_base_t::connect (const char *addr_)
std::string protocol;
std::string address;
int rc = parse_uri (addr_, protocol, address);
- if (rc != 0)
+ if (rc != 0) {
+ sync.unlock ();
return -1;
+ }
rc = check_protocol (protocol);
- if (rc != 0)
+ if (rc != 0) {
+ sync.unlock ();
return -1;
+ }
if (protocol == "inproc") {
@@ -386,8 +428,10 @@ int xs::socket_base_t::connect (const char *addr_)
// Find the peer endpoint.
endpoint_t peer = find_endpoint (addr_);
- if (!peer.socket)
+ if (!peer.socket) {
+ sync.unlock ();
return -1;
+ }
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
@@ -429,12 +473,14 @@ int xs::socket_base_t::connect (const char *addr_)
// increased here.
send_bind (peer.socket, pipes [1], false);
+ sync.unlock ();
return 0;
}
// Choose the I/O thread to run the session in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
+ sync.unlock ();
errno = EMTHREAD;
return -1;
}
@@ -467,27 +513,34 @@ int xs::socket_base_t::connect (const char *addr_)
// Activate the session. Make it a child of this socket.
launch_child (session);
+ sync.unlock ();
return 0;
}
int xs::socket_base_t::send (msg_t *msg_, int flags_)
{
+ sync.lock ();
+
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
// Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) {
+ sync.unlock ();
errno = EFAULT;
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, true);
- if (unlikely (rc != 0))
+ if (unlikely (rc != 0)) {
+ sync.unlock ();
return -1;
+ }
// Clear any user-visible flags that are set on the message.
msg_->reset_flags (msg_t::more);
@@ -498,15 +551,21 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)
// Try to send the message.
rc = xsend (msg_, flags_);
- if (rc == 0)
+ if (rc == 0) {
+ sync.unlock ();
return 0;
- if (unlikely (errno != EAGAIN))
+ }
+ if (unlikely (errno != EAGAIN)) {
+ sync.unlock ();
return -1;
+ }
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - up the stack.
- if (flags_ & XS_DONTWAIT || options.sndtimeo == 0)
+ if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) {
+ sync.unlock ();
return -1;
+ }
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
@@ -518,42 +577,55 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)
// command, process it and try to send the message again.
// If timeout is reached in the meantime, return EAGAIN.
while (true) {
- if (unlikely (process_commands (timeout, false) != 0))
+ if (unlikely (process_commands (timeout, false) != 0)) {
+ sync.unlock ();
return -1;
+ }
rc = xsend (msg_, flags_);
if (rc == 0)
break;
- if (unlikely (errno != EAGAIN))
+ if (unlikely (errno != EAGAIN)) {
+ sync.unlock ();
return -1;
+ }
if (timeout > 0) {
timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) {
+ sync.unlock ();
errno = EAGAIN;
return -1;
}
}
}
+
+ sync.unlock ();
return 0;
}
int xs::socket_base_t::recv (msg_t *msg_, int flags_)
{
+ sync.lock ();
+
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
+ sync.unlock ();
errno = ETERM;
return -1;
}
// Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) {
+ sync.unlock ();
errno = EFAULT;
return -1;
}
// Get the message.
int rc = xrecv (msg_, flags_);
- if (unlikely (rc != 0 && errno != EAGAIN))
+ if (unlikely (rc != 0 && errno != EAGAIN)) {
+ sync.unlock ();
return -1;
+ }
// Once every inbound_poll_rate messages check for signals and process
// incoming commands. This happens only if we are not polling altogether
@@ -572,6 +644,7 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately.
if (rc == 0) {
extract_flags (msg_);
+ sync.unlock ();
return 0;
}
@@ -580,14 +653,20 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN.
if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) {
- if (unlikely (process_commands (0, false) != 0))
+ if (unlikely (process_commands (0, false) != 0)) {
+ sync.unlock ();
return -1;
+ }
ticks = 0;
rc = xrecv (msg_, flags_);
- if (rc < 0)
+ if (rc < 0) {
+ sync.unlock ();
return rc;
+ }
+
extract_flags (msg_);
+ sync.unlock ();
return 0;
}
@@ -601,19 +680,24 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// we are able to fetch a message.
bool block = (ticks != 0);
while (true) {
- if (unlikely (process_commands (block ? timeout : 0, false) != 0))
+ if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
+ sync.unlock ();
return -1;
+ }
rc = xrecv (msg_, flags_);
if (rc == 0) {
ticks = 0;
break;
}
- if (unlikely (errno != EAGAIN))
+ if (unlikely (errno != EAGAIN)) {
+ sync.unlock ();
return -1;
+ }
block = true;
if (timeout > 0) {
timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) {
+ sync.unlock ();
errno = EAGAIN;
return -1;
}
@@ -621,11 +705,14 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
}
extract_flags (msg_);
+ sync.unlock ();
return 0;
}
int xs::socket_base_t::close ()
{
+ sync.lock ();
+
// Mark the socket as dead.
tag = 0xdeadbeef;
@@ -634,17 +721,24 @@ int xs::socket_base_t::close ()
// process.
send_reap (this);
+ sync.unlock ();
return 0;
}
bool xs::socket_base_t::has_in ()
{
- return xhas_in ();
+ sync.lock ();
+ bool ret = xhas_in ();
+ sync.unlock ();
+ return ret;
}
bool xs::socket_base_t::has_out ()
{
- return xhas_out ();
+ sync.lock ();
+ bool ret = xhas_out ();
+ sync.unlock ();
+ return ret;
}
void xs::socket_base_t::start_reaping (poller_base_t *poller_)
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 1e35ffa..850586e 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -141,6 +141,10 @@ namespace xs
// Used to check whether the object is a socket.
uint32_t tag;
+ // Synchronisation of access to the socket. If Crossroads are running
+ // in non-reentrant mode, it is a dummy mutex-like object.
+ mutex_t sync;
+
// If true, associated context was already terminated.
bool ctx_terminated;
diff --git a/tests/Makefile.am b/tests/Makefile.am
index fba6271..78bac57 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -18,7 +18,8 @@ noinst_PROGRAMS = pair_inproc \
pair_ipc \
reqrep_ipc \
timeo \
- max_sockets
+ max_sockets \
+ reentrant
pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp
pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp
@@ -36,5 +37,6 @@ pair_ipc_SOURCES = pair_ipc.cpp testutil.hpp
reqrep_ipc_SOURCES = reqrep_ipc.cpp testutil.hpp
timeo_SOURCES = timeo.cpp
max_sockets_SOURCES = max_sockets.cpp
+reentrant_SOURCES = reentrant.cpp testutil.hpp
TESTS = $(noinst_PROGRAMS)
diff --git a/tests/reentrant.cpp b/tests/reentrant.cpp
new file mode 100644
index 0000000..a4b2760
--- /dev/null
+++ b/tests/reentrant.cpp
@@ -0,0 +1,53 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads project.
+
+ Crossroads is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "testutil.hpp"
+
+int XS_TEST_MAIN ()
+{
+ fprintf (stderr, "reentrant test running...\n");
+
+ // Initialise the context and set REENTRANT option.
+ void *ctx = xs_init (1);
+ assert (ctx);
+ int val = 1;
+ int rc = xs_setctxopt (ctx, XS_CTX_REENTRANT, &val, sizeof (val));
+ assert (rc == 0);
+
+ // Do a set of operations to make sure that REENTRANT option doesn't
+ // break anything.
+ void *sb = xs_socket (ctx, XS_REP);
+ assert (sb);
+ rc = xs_bind (sb, "inproc://a");
+ assert (rc == 0);
+ void *sc = xs_socket (ctx, XS_REQ);
+ assert (sc);
+ rc = xs_connect (sc, "inproc://a");
+ assert (rc == 0);
+ bounce (sb, sc);
+ rc = xs_close (sc);
+ assert (rc == 0);
+ rc = xs_close (sb);
+ assert (rc == 0);
+ rc = xs_term (ctx);
+ assert (rc == 0);
+
+ return 0 ;
+}