From 1e01248efc113cc9389f795157400a634730823e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 16 Feb 2012 10:08:43 +0900 Subject: XS_CTX_REENTRANT option added Signed-off-by: Martin Sustrik --- src/ctx.cpp | 21 +++++++- src/ctx.hpp | 6 +++ src/mutex.hpp | 52 ++++++++++++------- src/socket_base.cpp | 140 +++++++++++++++++++++++++++++++++++++++++++--------- src/socket_base.hpp | 4 ++ 5 files changed, 181 insertions(+), 42 deletions(-) (limited to 'src') diff --git a/src/ctx.cpp b/src/ctx.cpp index b6298e2..917c3e6 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -43,7 +43,8 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) : starting (true), terminating (false), max_sockets (512), - io_thread_count (io_threads_) + io_thread_count (io_threads_), + reentrant (false) { } @@ -147,6 +148,16 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) max_sockets = *((int*) optval_); opt_sync.unlock (); break; + case XS_CTX_REENTRANT: + if (optvallen_ != sizeof (int) || (*((int*) optval_) != 0 && + *((int*) optval_) != 1)) { + errno = EINVAL; + return -1; + } + opt_sync.lock (); + reentrant = (*((int*) optval_) ? true : false); + opt_sync.unlock (); + break; default: errno = EINVAL; return -1; @@ -379,6 +390,14 @@ void xs::ctx_t::publish_logs (const char *text_) log_sync.unlock (); } +bool xs::ctx_t::is_reentrant () +{ + opt_sync.lock (); + bool ret = reentrant; + opt_sync.unlock (); + return ret; +} + // The last used socket ID, or 0 if no socket was used so far. Note that this // is a global variable. Thus, even sockets created in different contexts have // unique IDs. diff --git a/src/ctx.hpp b/src/ctx.hpp index 56b5d4c..e912443 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -100,6 +100,9 @@ namespace xs void log (int sid_, const char *text_); void publish_logs (const char *text_); + // True, if API is expected to be reentrant. + bool is_reentrant (); + enum { term_tid = 0, reaper_tid = 1 @@ -173,6 +176,9 @@ namespace xs // Number of I/O threads to launch. uint32_t io_thread_count; + // True, if API is expected to be reentrant. + bool reentrant; + // Synchronisation of access to context options. mutex_t opt_sync; diff --git a/src/mutex.hpp b/src/mutex.hpp index 118b5ef..fc75bed 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -37,28 +37,34 @@ namespace xs class mutex_t { public: - inline mutex_t () + inline mutex_t (bool fake_ = false) : + fake (fake_) { - InitializeCriticalSection (&cs); + if (!fake) + InitializeCriticalSection (&cs); } inline ~mutex_t () { - DeleteCriticalSection (&cs); + if (!fake) + DeleteCriticalSection (&cs); } inline void lock () { - EnterCriticalSection (&cs); + if (!fake) + EnterCriticalSection (&cs); } inline void unlock () { - LeaveCriticalSection (&cs); + if (!fake) + LeaveCriticalSection (&cs); } private: + bool fake; CRITICAL_SECTION cs; // Disable copy construction and assignment. @@ -78,36 +84,46 @@ namespace xs class mutex_t { public: - inline mutex_t () + inline mutex_t (bool fake_ = false) : + fake (fake_) { - int rc = pthread_mutex_init (&mutex, NULL); - if (rc) - posix_assert (rc); + if (!fake) { + int rc = pthread_mutex_init (&mutex, NULL); + if (rc) + posix_assert (rc); + } } inline ~mutex_t () { - int rc = pthread_mutex_destroy (&mutex); - if (rc) - posix_assert (rc); + if (!fake) { + int rc = pthread_mutex_destroy (&mutex); + if (rc) + posix_assert (rc); + } } inline void lock () { - int rc = pthread_mutex_lock (&mutex); - if (rc) - posix_assert (rc); + if (!fake) { + int rc = pthread_mutex_lock (&mutex); + if (rc) + posix_assert (rc); + } } inline void unlock () { - int rc = pthread_mutex_unlock (&mutex); - if (rc) - posix_assert (rc); + if (!fake) { + int rc = pthread_mutex_unlock (&mutex); + if (rc) + posix_assert (rc); + } } private: + bool fake; pthread_mutex_t mutex; // Disable copy construction and assignment. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index f3ae291..718263f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -117,6 +117,7 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_, xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : own_t (parent_, tid_), tag (0xbaddecaf), + sync (!parent_->is_reentrant ()), ctx_terminated (false), destroyed (false), last_tsc (0), @@ -226,57 +227,74 @@ void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_) int xs::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } // First, check whether specific socket type overloads the option. int rc = xsetsockopt (option_, optval_, optvallen_); - if (rc == 0 || errno != EINVAL) + if (rc == 0 || errno != EINVAL) { + sync.unlock (); return rc; + } // If the socket type doesn't support the option, pass it to // the generic option parser. - return options.setsockopt (option_, optval_, optvallen_); + rc = options.setsockopt (option_, optval_, optvallen_); + sync.unlock (); + return rc; } int xs::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } if (option_ == XS_RCVMORE) { if (*optvallen_ < sizeof (int)) { + sync.unlock (); errno = EINVAL; return -1; } *((int*) optval_) = rcvmore ? 1 : 0; *optvallen_ = sizeof (int); + sync.unlock (); return 0; } if (option_ == XS_FD) { if (*optvallen_ < sizeof (fd_t)) { errno = EINVAL; + sync.unlock (); return -1; } *((fd_t*) optval_) = mailbox.get_fd (); *optvallen_ = sizeof (fd_t); + sync.unlock (); return 0; } if (option_ == XS_EVENTS) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; + sync.unlock (); return -1; } int rc = process_commands (0, false); - if (rc != 0 && (errno == EINTR || errno == ETERM)) + if (rc != 0 && (errno == EINTR || errno == ETERM)) { + sync.unlock (); return -1; + } errno_assert (rc == 0); *((int*) optval_) = 0; if (has_out ()) @@ -284,6 +302,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, if (has_in ()) *((int*) optval_) |= XS_POLLIN; *optvallen_ = sizeof (int); + sync.unlock (); return 0; } @@ -292,7 +311,10 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, int xs::socket_base_t::bind (const char *addr_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } @@ -301,29 +323,38 @@ int xs::socket_base_t::bind (const char *addr_) std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } rc = check_protocol (protocol); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } if (protocol == "inproc") { endpoint_t endpoint = {this, options}; - return register_endpoint (addr_, endpoint); + rc = register_endpoint (addr_, endpoint); + sync.unlock (); + return rc; } if (protocol == "pgm" || protocol == "epgm") { // For convenience's sake, bind can be used interchageable with // connect for PGM and EPGM transports. - return connect (addr_); + rc = connect (addr_); + sync.unlock (); + return rc; } // Remaining trasnports require to be run in an I/O thread, so at this // point we'll choose one. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { + sync.unlock (); errno = EMTHREAD; return -1; } @@ -335,9 +366,11 @@ int xs::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; + sync.unlock (); return -1; } launch_child (listener); + sync.unlock (); return 0; } @@ -349,9 +382,11 @@ int xs::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; + sync.unlock (); return -1; } launch_child (listener); + sync.unlock (); return 0; } #endif @@ -362,7 +397,10 @@ int xs::socket_base_t::bind (const char *addr_) int xs::socket_base_t::connect (const char *addr_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } @@ -371,12 +409,16 @@ int xs::socket_base_t::connect (const char *addr_) std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } rc = check_protocol (protocol); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } if (protocol == "inproc") { @@ -386,8 +428,10 @@ int xs::socket_base_t::connect (const char *addr_) // Find the peer endpoint. endpoint_t peer = find_endpoint (addr_); - if (!peer.socket) + if (!peer.socket) { + sync.unlock (); return -1; + } // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. @@ -429,12 +473,14 @@ int xs::socket_base_t::connect (const char *addr_) // increased here. send_bind (peer.socket, pipes [1], false); + sync.unlock (); return 0; } // Choose the I/O thread to run the session in. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { + sync.unlock (); errno = EMTHREAD; return -1; } @@ -467,27 +513,34 @@ int xs::socket_base_t::connect (const char *addr_) // Activate the session. Make it a child of this socket. launch_child (session); + sync.unlock (); return 0; } int xs::socket_base_t::send (msg_t *msg_, int flags_) { + sync.lock (); + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { + sync.unlock (); errno = EFAULT; return -1; } // Process pending commands, if any. int rc = process_commands (0, true); - if (unlikely (rc != 0)) + if (unlikely (rc != 0)) { + sync.unlock (); return -1; + } // Clear any user-visible flags that are set on the message. msg_->reset_flags (msg_t::more); @@ -498,15 +551,21 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // Try to send the message. rc = xsend (msg_, flags_); - if (rc == 0) + if (rc == 0) { + sync.unlock (); return 0; - if (unlikely (errno != EAGAIN)) + } + if (unlikely (errno != EAGAIN)) { + sync.unlock (); return -1; + } // In case of non-blocking send we'll simply propagate // the error - including EAGAIN - up the stack. - if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) + if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) { + sync.unlock (); return -1; + } // Compute the time when the timeout should occur. // If the timeout is infite, don't care. @@ -518,42 +577,55 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // command, process it and try to send the message again. // If timeout is reached in the meantime, return EAGAIN. while (true) { - if (unlikely (process_commands (timeout, false) != 0)) + if (unlikely (process_commands (timeout, false) != 0)) { + sync.unlock (); return -1; + } rc = xsend (msg_, flags_); if (rc == 0) break; - if (unlikely (errno != EAGAIN)) + if (unlikely (errno != EAGAIN)) { + sync.unlock (); return -1; + } if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { + sync.unlock (); errno = EAGAIN; return -1; } } } + + sync.unlock (); return 0; } int xs::socket_base_t::recv (msg_t *msg_, int flags_) { + sync.lock (); + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { + sync.unlock (); errno = EFAULT; return -1; } // Get the message. int rc = xrecv (msg_, flags_); - if (unlikely (rc != 0 && errno != EAGAIN)) + if (unlikely (rc != 0 && errno != EAGAIN)) { + sync.unlock (); return -1; + } // Once every inbound_poll_rate messages check for signals and process // incoming commands. This happens only if we are not polling altogether @@ -572,6 +644,7 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // If we have the message, return immediately. if (rc == 0) { extract_flags (msg_); + sync.unlock (); return 0; } @@ -580,14 +653,20 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // activate_reader command already waiting int a command pipe. // If it's not, return EAGAIN. if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) { - if (unlikely (process_commands (0, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) { + sync.unlock (); return -1; + } ticks = 0; rc = xrecv (msg_, flags_); - if (rc < 0) + if (rc < 0) { + sync.unlock (); return rc; + } + extract_flags (msg_); + sync.unlock (); return 0; } @@ -601,19 +680,24 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // we are able to fetch a message. bool block = (ticks != 0); while (true) { - if (unlikely (process_commands (block ? timeout : 0, false) != 0)) + if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { + sync.unlock (); return -1; + } rc = xrecv (msg_, flags_); if (rc == 0) { ticks = 0; break; } - if (unlikely (errno != EAGAIN)) + if (unlikely (errno != EAGAIN)) { + sync.unlock (); return -1; + } block = true; if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { + sync.unlock (); errno = EAGAIN; return -1; } @@ -621,11 +705,14 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) } extract_flags (msg_); + sync.unlock (); return 0; } int xs::socket_base_t::close () { + sync.lock (); + // Mark the socket as dead. tag = 0xdeadbeef; @@ -634,17 +721,24 @@ int xs::socket_base_t::close () // process. send_reap (this); + sync.unlock (); return 0; } bool xs::socket_base_t::has_in () { - return xhas_in (); + sync.lock (); + bool ret = xhas_in (); + sync.unlock (); + return ret; } bool xs::socket_base_t::has_out () { - return xhas_out (); + sync.lock (); + bool ret = xhas_out (); + sync.unlock (); + return ret; } void xs::socket_base_t::start_reaping (poller_base_t *poller_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1e35ffa..850586e 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -141,6 +141,10 @@ namespace xs // Used to check whether the object is a socket. uint32_t tag; + // Synchronisation of access to the socket. If Crossroads are running + // in non-reentrant mode, it is a dummy mutex-like object. + mutex_t sync; + // If true, associated context was already terminated. bool ctx_terminated; -- cgit v1.2.3