From 1e01248efc113cc9389f795157400a634730823e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 16 Feb 2012 10:08:43 +0900 Subject: XS_CTX_REENTRANT option added Signed-off-by: Martin Sustrik --- .gitignore | 1 + doc/xs_setctxopt.txt | 16 ++++++ include/xs.h | 1 + src/ctx.cpp | 21 +++++++- src/ctx.hpp | 6 +++ src/mutex.hpp | 52 ++++++++++++------- src/socket_base.cpp | 140 ++++++++++++++++++++++++++++++++++++++++++--------- src/socket_base.hpp | 4 ++ tests/Makefile.am | 4 +- tests/reentrant.cpp | 53 +++++++++++++++++++ 10 files changed, 255 insertions(+), 43 deletions(-) create mode 100644 tests/reentrant.cpp diff --git a/.gitignore b/.gitignore index 4450075..83b6bdb 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ tests/msg_flags tests/reconnect tests/linger tests/max_sockets +tests/reentrant src/platform.hpp* src/stamp-h1 perf/local_lat diff --git a/doc/xs_setctxopt.txt b/doc/xs_setctxopt.txt index be9e654..bc7d018 100644 --- a/doc/xs_setctxopt.txt +++ b/doc/xs_setctxopt.txt @@ -34,6 +34,22 @@ Option value type:: int Option value unit:: sockets Default value:: 512 +XS_CTX_REENTRANT: Specify whether sockets should be thread-safe +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +If 'XS_CTX_REENTRANT' option is set to 1 it is safe to access single Crossroads +socket from multiple threads in parallel. If it is set to 0 it can be accessed +by at most one thread at any single point of time. + +Note: By default Crossroads sockets are non-reentrant. If possible, try to use +the socket from the thread it was created in. If communication between threads +is needed use inproc transport. Not following this advice can introduce +scalability problems. + +[horizontal] +Option value type:: int +Option value unit:: boolean +Default value:: 0 + RETURN VALUE ------------ The _xs_setctxopt()_ function shall return zero if successful. Otherwise it diff --git a/include/xs.h b/include/xs.h index 3aa1045..4d0be57 100644 --- a/include/xs.h +++ b/include/xs.h @@ -147,6 +147,7 @@ XS_EXPORT int xs_getmsgopt (xs_msg_t *msg, int option, void *optval, /******************************************************************************/ #define XS_CTX_MAX_SOCKETS 1 +#define XS_CTX_REENTRANT 2 XS_EXPORT void *xs_init (int io_threads); XS_EXPORT int xs_term (void *context); diff --git a/src/ctx.cpp b/src/ctx.cpp index b6298e2..917c3e6 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -43,7 +43,8 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) : starting (true), terminating (false), max_sockets (512), - io_thread_count (io_threads_) + io_thread_count (io_threads_), + reentrant (false) { } @@ -147,6 +148,16 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) max_sockets = *((int*) optval_); opt_sync.unlock (); break; + case XS_CTX_REENTRANT: + if (optvallen_ != sizeof (int) || (*((int*) optval_) != 0 && + *((int*) optval_) != 1)) { + errno = EINVAL; + return -1; + } + opt_sync.lock (); + reentrant = (*((int*) optval_) ? true : false); + opt_sync.unlock (); + break; default: errno = EINVAL; return -1; @@ -379,6 +390,14 @@ void xs::ctx_t::publish_logs (const char *text_) log_sync.unlock (); } +bool xs::ctx_t::is_reentrant () +{ + opt_sync.lock (); + bool ret = reentrant; + opt_sync.unlock (); + return ret; +} + // The last used socket ID, or 0 if no socket was used so far. Note that this // is a global variable. Thus, even sockets created in different contexts have // unique IDs. diff --git a/src/ctx.hpp b/src/ctx.hpp index 56b5d4c..e912443 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -100,6 +100,9 @@ namespace xs void log (int sid_, const char *text_); void publish_logs (const char *text_); + // True, if API is expected to be reentrant. + bool is_reentrant (); + enum { term_tid = 0, reaper_tid = 1 @@ -173,6 +176,9 @@ namespace xs // Number of I/O threads to launch. uint32_t io_thread_count; + // True, if API is expected to be reentrant. + bool reentrant; + // Synchronisation of access to context options. mutex_t opt_sync; diff --git a/src/mutex.hpp b/src/mutex.hpp index 118b5ef..fc75bed 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -37,28 +37,34 @@ namespace xs class mutex_t { public: - inline mutex_t () + inline mutex_t (bool fake_ = false) : + fake (fake_) { - InitializeCriticalSection (&cs); + if (!fake) + InitializeCriticalSection (&cs); } inline ~mutex_t () { - DeleteCriticalSection (&cs); + if (!fake) + DeleteCriticalSection (&cs); } inline void lock () { - EnterCriticalSection (&cs); + if (!fake) + EnterCriticalSection (&cs); } inline void unlock () { - LeaveCriticalSection (&cs); + if (!fake) + LeaveCriticalSection (&cs); } private: + bool fake; CRITICAL_SECTION cs; // Disable copy construction and assignment. @@ -78,36 +84,46 @@ namespace xs class mutex_t { public: - inline mutex_t () + inline mutex_t (bool fake_ = false) : + fake (fake_) { - int rc = pthread_mutex_init (&mutex, NULL); - if (rc) - posix_assert (rc); + if (!fake) { + int rc = pthread_mutex_init (&mutex, NULL); + if (rc) + posix_assert (rc); + } } inline ~mutex_t () { - int rc = pthread_mutex_destroy (&mutex); - if (rc) - posix_assert (rc); + if (!fake) { + int rc = pthread_mutex_destroy (&mutex); + if (rc) + posix_assert (rc); + } } inline void lock () { - int rc = pthread_mutex_lock (&mutex); - if (rc) - posix_assert (rc); + if (!fake) { + int rc = pthread_mutex_lock (&mutex); + if (rc) + posix_assert (rc); + } } inline void unlock () { - int rc = pthread_mutex_unlock (&mutex); - if (rc) - posix_assert (rc); + if (!fake) { + int rc = pthread_mutex_unlock (&mutex); + if (rc) + posix_assert (rc); + } } private: + bool fake; pthread_mutex_t mutex; // Disable copy construction and assignment. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index f3ae291..718263f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -117,6 +117,7 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_, xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : own_t (parent_, tid_), tag (0xbaddecaf), + sync (!parent_->is_reentrant ()), ctx_terminated (false), destroyed (false), last_tsc (0), @@ -226,57 +227,74 @@ void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_) int xs::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } // First, check whether specific socket type overloads the option. int rc = xsetsockopt (option_, optval_, optvallen_); - if (rc == 0 || errno != EINVAL) + if (rc == 0 || errno != EINVAL) { + sync.unlock (); return rc; + } // If the socket type doesn't support the option, pass it to // the generic option parser. - return options.setsockopt (option_, optval_, optvallen_); + rc = options.setsockopt (option_, optval_, optvallen_); + sync.unlock (); + return rc; } int xs::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } if (option_ == XS_RCVMORE) { if (*optvallen_ < sizeof (int)) { + sync.unlock (); errno = EINVAL; return -1; } *((int*) optval_) = rcvmore ? 1 : 0; *optvallen_ = sizeof (int); + sync.unlock (); return 0; } if (option_ == XS_FD) { if (*optvallen_ < sizeof (fd_t)) { errno = EINVAL; + sync.unlock (); return -1; } *((fd_t*) optval_) = mailbox.get_fd (); *optvallen_ = sizeof (fd_t); + sync.unlock (); return 0; } if (option_ == XS_EVENTS) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; + sync.unlock (); return -1; } int rc = process_commands (0, false); - if (rc != 0 && (errno == EINTR || errno == ETERM)) + if (rc != 0 && (errno == EINTR || errno == ETERM)) { + sync.unlock (); return -1; + } errno_assert (rc == 0); *((int*) optval_) = 0; if (has_out ()) @@ -284,6 +302,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, if (has_in ()) *((int*) optval_) |= XS_POLLIN; *optvallen_ = sizeof (int); + sync.unlock (); return 0; } @@ -292,7 +311,10 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, int xs::socket_base_t::bind (const char *addr_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } @@ -301,29 +323,38 @@ int xs::socket_base_t::bind (const char *addr_) std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } rc = check_protocol (protocol); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } if (protocol == "inproc") { endpoint_t endpoint = {this, options}; - return register_endpoint (addr_, endpoint); + rc = register_endpoint (addr_, endpoint); + sync.unlock (); + return rc; } if (protocol == "pgm" || protocol == "epgm") { // For convenience's sake, bind can be used interchageable with // connect for PGM and EPGM transports. - return connect (addr_); + rc = connect (addr_); + sync.unlock (); + return rc; } // Remaining trasnports require to be run in an I/O thread, so at this // point we'll choose one. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { + sync.unlock (); errno = EMTHREAD; return -1; } @@ -335,9 +366,11 @@ int xs::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; + sync.unlock (); return -1; } launch_child (listener); + sync.unlock (); return 0; } @@ -349,9 +382,11 @@ int xs::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; + sync.unlock (); return -1; } launch_child (listener); + sync.unlock (); return 0; } #endif @@ -362,7 +397,10 @@ int xs::socket_base_t::bind (const char *addr_) int xs::socket_base_t::connect (const char *addr_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } @@ -371,12 +409,16 @@ int xs::socket_base_t::connect (const char *addr_) std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } rc = check_protocol (protocol); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } if (protocol == "inproc") { @@ -386,8 +428,10 @@ int xs::socket_base_t::connect (const char *addr_) // Find the peer endpoint. endpoint_t peer = find_endpoint (addr_); - if (!peer.socket) + if (!peer.socket) { + sync.unlock (); return -1; + } // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. @@ -429,12 +473,14 @@ int xs::socket_base_t::connect (const char *addr_) // increased here. send_bind (peer.socket, pipes [1], false); + sync.unlock (); return 0; } // Choose the I/O thread to run the session in. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { + sync.unlock (); errno = EMTHREAD; return -1; } @@ -467,27 +513,34 @@ int xs::socket_base_t::connect (const char *addr_) // Activate the session. Make it a child of this socket. launch_child (session); + sync.unlock (); return 0; } int xs::socket_base_t::send (msg_t *msg_, int flags_) { + sync.lock (); + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { + sync.unlock (); errno = EFAULT; return -1; } // Process pending commands, if any. int rc = process_commands (0, true); - if (unlikely (rc != 0)) + if (unlikely (rc != 0)) { + sync.unlock (); return -1; + } // Clear any user-visible flags that are set on the message. msg_->reset_flags (msg_t::more); @@ -498,15 +551,21 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // Try to send the message. rc = xsend (msg_, flags_); - if (rc == 0) + if (rc == 0) { + sync.unlock (); return 0; - if (unlikely (errno != EAGAIN)) + } + if (unlikely (errno != EAGAIN)) { + sync.unlock (); return -1; + } // In case of non-blocking send we'll simply propagate // the error - including EAGAIN - up the stack. - if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) + if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) { + sync.unlock (); return -1; + } // Compute the time when the timeout should occur. // If the timeout is infite, don't care. @@ -518,42 +577,55 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // command, process it and try to send the message again. // If timeout is reached in the meantime, return EAGAIN. while (true) { - if (unlikely (process_commands (timeout, false) != 0)) + if (unlikely (process_commands (timeout, false) != 0)) { + sync.unlock (); return -1; + } rc = xsend (msg_, flags_); if (rc == 0) break; - if (unlikely (errno != EAGAIN)) + if (unlikely (errno != EAGAIN)) { + sync.unlock (); return -1; + } if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { + sync.unlock (); errno = EAGAIN; return -1; } } } + + sync.unlock (); return 0; } int xs::socket_base_t::recv (msg_t *msg_, int flags_) { + sync.lock (); + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { + sync.unlock (); errno = EFAULT; return -1; } // Get the message. int rc = xrecv (msg_, flags_); - if (unlikely (rc != 0 && errno != EAGAIN)) + if (unlikely (rc != 0 && errno != EAGAIN)) { + sync.unlock (); return -1; + } // Once every inbound_poll_rate messages check for signals and process // incoming commands. This happens only if we are not polling altogether @@ -572,6 +644,7 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // If we have the message, return immediately. if (rc == 0) { extract_flags (msg_); + sync.unlock (); return 0; } @@ -580,14 +653,20 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // activate_reader command already waiting int a command pipe. // If it's not, return EAGAIN. if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) { - if (unlikely (process_commands (0, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) { + sync.unlock (); return -1; + } ticks = 0; rc = xrecv (msg_, flags_); - if (rc < 0) + if (rc < 0) { + sync.unlock (); return rc; + } + extract_flags (msg_); + sync.unlock (); return 0; } @@ -601,19 +680,24 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // we are able to fetch a message. bool block = (ticks != 0); while (true) { - if (unlikely (process_commands (block ? timeout : 0, false) != 0)) + if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { + sync.unlock (); return -1; + } rc = xrecv (msg_, flags_); if (rc == 0) { ticks = 0; break; } - if (unlikely (errno != EAGAIN)) + if (unlikely (errno != EAGAIN)) { + sync.unlock (); return -1; + } block = true; if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { + sync.unlock (); errno = EAGAIN; return -1; } @@ -621,11 +705,14 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) } extract_flags (msg_); + sync.unlock (); return 0; } int xs::socket_base_t::close () { + sync.lock (); + // Mark the socket as dead. tag = 0xdeadbeef; @@ -634,17 +721,24 @@ int xs::socket_base_t::close () // process. send_reap (this); + sync.unlock (); return 0; } bool xs::socket_base_t::has_in () { - return xhas_in (); + sync.lock (); + bool ret = xhas_in (); + sync.unlock (); + return ret; } bool xs::socket_base_t::has_out () { - return xhas_out (); + sync.lock (); + bool ret = xhas_out (); + sync.unlock (); + return ret; } void xs::socket_base_t::start_reaping (poller_base_t *poller_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1e35ffa..850586e 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -141,6 +141,10 @@ namespace xs // Used to check whether the object is a socket. uint32_t tag; + // Synchronisation of access to the socket. If Crossroads are running + // in non-reentrant mode, it is a dummy mutex-like object. + mutex_t sync; + // If true, associated context was already terminated. bool ctx_terminated; diff --git a/tests/Makefile.am b/tests/Makefile.am index fba6271..78bac57 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -18,7 +18,8 @@ noinst_PROGRAMS = pair_inproc \ pair_ipc \ reqrep_ipc \ timeo \ - max_sockets + max_sockets \ + reentrant pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp @@ -36,5 +37,6 @@ pair_ipc_SOURCES = pair_ipc.cpp testutil.hpp reqrep_ipc_SOURCES = reqrep_ipc.cpp testutil.hpp timeo_SOURCES = timeo.cpp max_sockets_SOURCES = max_sockets.cpp +reentrant_SOURCES = reentrant.cpp testutil.hpp TESTS = $(noinst_PROGRAMS) diff --git a/tests/reentrant.cpp b/tests/reentrant.cpp new file mode 100644 index 0000000..a4b2760 --- /dev/null +++ b/tests/reentrant.cpp @@ -0,0 +1,53 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads project. + + Crossroads is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +int XS_TEST_MAIN () +{ + fprintf (stderr, "reentrant test running...\n"); + + // Initialise the context and set REENTRANT option. + void *ctx = xs_init (1); + assert (ctx); + int val = 1; + int rc = xs_setctxopt (ctx, XS_CTX_REENTRANT, &val, sizeof (val)); + assert (rc == 0); + + // Do a set of operations to make sure that REENTRANT option doesn't + // break anything. + void *sb = xs_socket (ctx, XS_REP); + assert (sb); + rc = xs_bind (sb, "inproc://a"); + assert (rc == 0); + void *sc = xs_socket (ctx, XS_REQ); + assert (sc); + rc = xs_connect (sc, "inproc://a"); + assert (rc == 0); + bounce (sb, sc); + rc = xs_close (sc); + assert (rc == 0); + rc = xs_close (sb); + assert (rc == 0); + rc = xs_term (ctx); + assert (rc == 0); + + return 0 ; +} -- cgit v1.2.3