From 7c2dfc65b15f62751470778284b325734d0a241b Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 16 Feb 2012 10:09:36 +0900 Subject: Socket re-entrancy rewritten The inspiration for this re-write came form John Skaller's patch. Adding him to Credits section of the AUTHORS file. Signed-off-by: Martin Sustrik --- src/mutex.hpp | 53 ++++++++------------- src/socket_base.cpp | 133 ++++++++++++---------------------------------------- src/socket_base.hpp | 5 ++ src/xs.cpp | 48 +++++++++++++------ 4 files changed, 89 insertions(+), 150 deletions(-) (limited to 'src') diff --git a/src/mutex.hpp b/src/mutex.hpp index fc75bed..c0a2cc9 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -37,34 +37,28 @@ namespace xs class mutex_t { public: - inline mutex_t (bool fake_ = false) : - fake (fake_) + inline mutex_t () { - if (!fake) - InitializeCriticalSection (&cs); + InitializeCriticalSection (&cs); } inline ~mutex_t () { - if (!fake) - DeleteCriticalSection (&cs); + DeleteCriticalSection (&cs); } inline void lock () { - if (!fake) - EnterCriticalSection (&cs); + EnterCriticalSection (&cs); } inline void unlock () { - if (!fake) - LeaveCriticalSection (&cs); + LeaveCriticalSection (&cs); } private: - bool fake; CRITICAL_SECTION cs; // Disable copy construction and assignment. @@ -84,46 +78,36 @@ namespace xs class mutex_t { public: - inline mutex_t (bool fake_ = false) : - fake (fake_) + inline mutex_t () { - if (!fake) { - int rc = pthread_mutex_init (&mutex, NULL); - if (rc) - posix_assert (rc); - } + int rc = pthread_mutex_init (&mutex, NULL); + if (rc) + posix_assert (rc); } inline ~mutex_t () { - if (!fake) { - int rc = pthread_mutex_destroy (&mutex); - if (rc) - posix_assert (rc); - } + int rc = pthread_mutex_destroy (&mutex); + if (rc) + posix_assert (rc); } inline void lock () { - if (!fake) { - int rc = pthread_mutex_lock (&mutex); - if (rc) - posix_assert (rc); - } + int rc = pthread_mutex_lock (&mutex); + if (rc) + posix_assert (rc); } inline void unlock () { - if (!fake) { - int rc = pthread_mutex_unlock (&mutex); - if (rc) - posix_assert (rc); - } + int rc = pthread_mutex_unlock (&mutex); + if (rc) + posix_assert (rc); } private: - bool fake; pthread_mutex_t mutex; // Disable copy construction and assignment. @@ -136,3 +120,4 @@ namespace xs #endif #endif + diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fb387db..97ac4b4 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -117,7 +117,7 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_, xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : own_t (parent_, tid_), tag (0xbaddecaf), - sync (!parent_->is_reentrant ()), + reentrant (parent_->is_reentrant ()), ctx_terminated (false), destroyed (false), last_tsc (0), @@ -146,6 +146,18 @@ void xs::socket_base_t::stop () send_stop (); } +void xs::socket_base_t::lock () +{ + if (reentrant) + sync.lock (); +} + +void xs::socket_base_t::unlock () +{ + if (reentrant) + sync.unlock (); +} + int xs::socket_base_t::parse_uri (const char *uri_, std::string &protocol_, std::string &address_) { @@ -227,74 +239,58 @@ void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_) int xs::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { - sync.lock (); - if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } // First, check whether specific socket type overloads the option. int rc = xsetsockopt (option_, optval_, optvallen_); - if (rc == 0 || errno != EINVAL) { - sync.unlock (); + if (rc == 0 || errno != EINVAL) return rc; - } // If the socket type doesn't support the option, pass it to // the generic option parser. rc = options.setsockopt (option_, optval_, optvallen_); - sync.unlock (); return rc; } int xs::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { - sync.lock (); - if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } if (option_ == XS_RCVMORE) { if (*optvallen_ < sizeof (int)) { - sync.unlock (); errno = EINVAL; return -1; } *((int*) optval_) = rcvmore ? 1 : 0; *optvallen_ = sizeof (int); - sync.unlock (); return 0; } if (option_ == XS_FD) { if (*optvallen_ < sizeof (fd_t)) { errno = EINVAL; - sync.unlock (); return -1; } *((fd_t*) optval_) = mailbox.get_fd (); *optvallen_ = sizeof (fd_t); - sync.unlock (); return 0; } if (option_ == XS_EVENTS) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; - sync.unlock (); return -1; } int rc = process_commands (0, false); - if (rc != 0 && (errno == EINTR || errno == ETERM)) { - sync.unlock (); + if (rc != 0 && (errno == EINTR || errno == ETERM)) return -1; - } errno_assert (rc == 0); *((int*) optval_) = 0; if (has_out ()) @@ -302,7 +298,6 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, if (has_in ()) *((int*) optval_) |= XS_POLLIN; *optvallen_ = sizeof (int); - sync.unlock (); return 0; } @@ -311,10 +306,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, int xs::socket_base_t::bind (const char *addr_) { - sync.lock (); - if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } @@ -323,21 +315,16 @@ int xs::socket_base_t::bind (const char *addr_) std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); - if (rc != 0) { - sync.unlock (); + if (rc != 0) return -1; - } rc = check_protocol (protocol); - if (rc != 0) { - sync.unlock (); + if (rc != 0) return -1; - } if (protocol == "inproc") { endpoint_t endpoint = {this, options}; rc = register_endpoint (addr_, endpoint); - sync.unlock (); return rc; } @@ -346,7 +333,6 @@ int xs::socket_base_t::bind (const char *addr_) // For convenience's sake, bind can be used interchageable with // connect for PGM and EPGM transports. rc = connect (addr_); - sync.unlock (); return rc; } @@ -354,7 +340,6 @@ int xs::socket_base_t::bind (const char *addr_) // point we'll choose one. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { - sync.unlock (); errno = EMTHREAD; return -1; } @@ -366,11 +351,9 @@ int xs::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; - sync.unlock (); return -1; } launch_child (listener); - sync.unlock (); return 0; } @@ -382,11 +365,9 @@ int xs::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; - sync.unlock (); return -1; } launch_child (listener); - sync.unlock (); return 0; } #endif @@ -397,10 +378,7 @@ int xs::socket_base_t::bind (const char *addr_) int xs::socket_base_t::connect (const char *addr_) { - sync.lock (); - if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } @@ -409,16 +387,12 @@ int xs::socket_base_t::connect (const char *addr_) std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); - if (rc != 0) { - sync.unlock (); + if (rc != 0) return -1; - } rc = check_protocol (protocol); - if (rc != 0) { - sync.unlock (); + if (rc != 0) return -1; - } if (protocol == "inproc") { @@ -428,10 +402,8 @@ int xs::socket_base_t::connect (const char *addr_) // Find the peer endpoint. endpoint_t peer = find_endpoint (addr_); - if (!peer.socket) { - sync.unlock (); + if (!peer.socket) return -1; - } // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. @@ -473,14 +445,12 @@ int xs::socket_base_t::connect (const char *addr_) // increased here. send_bind (peer.socket, pipes [1], false); - sync.unlock (); return 0; } // Choose the I/O thread to run the session in. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { - sync.unlock (); errno = EMTHREAD; return -1; } @@ -513,34 +483,27 @@ int xs::socket_base_t::connect (const char *addr_) // Activate the session. Make it a child of this socket. launch_child (session); - sync.unlock (); return 0; } int xs::socket_base_t::send (msg_t *msg_, int flags_) { - sync.lock (); - // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { - sync.unlock (); errno = EFAULT; return -1; } // Process pending commands, if any. int rc = process_commands (0, true); - if (unlikely (rc != 0)) { - sync.unlock (); + if (unlikely (rc != 0)) return -1; - } // Clear any user-visible flags that are set on the message. msg_->reset_flags (msg_t::more); @@ -551,21 +514,15 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // Try to send the message. rc = xsend (msg_, flags_); - if (rc == 0) { - sync.unlock (); + if (rc == 0) return 0; - } - if (unlikely (errno != EAGAIN)) { - sync.unlock (); + if (unlikely (errno != EAGAIN)) return -1; - } // In case of non-blocking send we'll simply propagate // the error - including EAGAIN - up the stack. - if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) { - sync.unlock (); + if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) return -1; - } // Compute the time when the timeout should occur. // If the timeout is infite, don't care. @@ -577,55 +534,43 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // command, process it and try to send the message again. // If timeout is reached in the meantime, return EAGAIN. while (true) { - if (unlikely (process_commands (timeout, false) != 0)) { - sync.unlock (); + if (unlikely (process_commands (timeout, false) != 0)) return -1; - } rc = xsend (msg_, flags_); if (rc == 0) break; - if (unlikely (errno != EAGAIN)) { - sync.unlock (); + if (unlikely (errno != EAGAIN)) return -1; - } if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { - sync.unlock (); errno = EAGAIN; return -1; } } } - sync.unlock (); return 0; } int xs::socket_base_t::recv (msg_t *msg_, int flags_) { - sync.lock (); - // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { - sync.unlock (); errno = EFAULT; return -1; } // Get the message. int rc = xrecv (msg_, flags_); - if (unlikely (rc != 0 && errno != EAGAIN)) { - sync.unlock (); + if (unlikely (rc != 0 && errno != EAGAIN)) return -1; - } // Once every inbound_poll_rate messages check for signals and process // incoming commands. This happens only if we are not polling altogether @@ -644,7 +589,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // If we have the message, return immediately. if (rc == 0) { extract_flags (msg_); - sync.unlock (); return 0; } @@ -653,20 +597,15 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // activate_reader command already waiting int a command pipe. // If it's not, return EAGAIN. if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) { - if (unlikely (process_commands (0, false) != 0)) { - sync.unlock (); + if (unlikely (process_commands (0, false) != 0)) return -1; - } ticks = 0; rc = xrecv (msg_, flags_); - if (rc < 0) { - sync.unlock (); + if (rc < 0) return rc; - } extract_flags (msg_); - sync.unlock (); return 0; } @@ -680,24 +619,19 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // we are able to fetch a message. bool block = (ticks != 0); while (true) { - if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { - sync.unlock (); + if (unlikely (process_commands (block ? timeout : 0, false) != 0)) return -1; - } rc = xrecv (msg_, flags_); if (rc == 0) { ticks = 0; break; } - if (unlikely (errno != EAGAIN)) { - sync.unlock (); + if (unlikely (errno != EAGAIN)) return -1; - } block = true; if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { - sync.unlock (); errno = EAGAIN; return -1; } @@ -705,7 +639,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) } extract_flags (msg_); - sync.unlock (); return 0; } @@ -724,17 +657,13 @@ int xs::socket_base_t::close () bool xs::socket_base_t::has_in () { - sync.lock (); bool ret = xhas_in (); - sync.unlock (); return ret; } bool xs::socket_base_t::has_out () { - sync.lock (); bool ret = xhas_out (); - sync.unlock (); return ret; } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 850586e..8b9a948 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -65,6 +65,10 @@ namespace xs // This function can be called from a different thread! void stop (); + // Synchronise access of application threads to the socket. + void lock (); + void unlock (); + // Interface for communication with the API layer. int setsockopt (int option_, const void *optval_, size_t optvallen_); int getsockopt (int option_, void *optval_, size_t *optvallen_); @@ -143,6 +147,7 @@ namespace xs // Synchronisation of access to the socket. If Crossroads are running // in non-reentrant mode, it is a dummy mutex-like object. + bool reentrant; mutex_t sync; // If true, associated context was already terminated. diff --git a/src/xs.cpp b/src/xs.cpp index ad5bd97..7160da9 100644 --- a/src/xs.cpp +++ b/src/xs.cpp @@ -202,40 +202,54 @@ int xs_close (void *s_) int xs_setsockopt (void *s_, int option_, const void *optval_, size_t optvallen_) { - if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { + xs::socket_base_t *s = (xs::socket_base_t*) s_; + if (!s || !s->check_tag ()) { errno = ENOTSOCK; return -1; } - return (((xs::socket_base_t*) s_)->setsockopt (option_, optval_, - optvallen_)); + s->lock (); + int rc = s->setsockopt (option_, optval_, optvallen_); + s->unlock (); + return rc; } int xs_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) { - if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { + xs::socket_base_t *s = (xs::socket_base_t*) s_; + if (!s || !s->check_tag ()) { errno = ENOTSOCK; return -1; } - return (((xs::socket_base_t*) s_)->getsockopt (option_, optval_, - optvallen_)); + s->lock (); + int rc = s->getsockopt (option_, optval_, optvallen_); + s->unlock (); + return rc; } int xs_bind (void *s_, const char *addr_) { - if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { + xs::socket_base_t *s = (xs::socket_base_t*) s_; + if (!s || !s->check_tag ()) { errno = ENOTSOCK; return -1; } - return (((xs::socket_base_t*) s_)->bind (addr_)); + s->lock (); + int rc = s->bind (addr_); + s->unlock (); + return rc; } int xs_connect (void *s_, const char *addr_) { - if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { + xs::socket_base_t *s = (xs::socket_base_t*) s_; + if (!s || !s->check_tag ()) { errno = ENOTSOCK; return -1; } - return (((xs::socket_base_t*) s_)->connect (addr_)); + s->lock (); + int rc = s->connect (addr_); + s->unlock (); + return rc; } int xs_send (void *s_, const void *buf_, size_t len_, int flags_) @@ -288,12 +302,15 @@ int xs_recv (void *s_, void *buf_, size_t len_, int flags_) int xs_sendmsg (void *s_, xs_msg_t *msg_, int flags_) { - if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { + xs::socket_base_t *s = (xs::socket_base_t*) s_; + if (!s || !s->check_tag ()) { errno = ENOTSOCK; return -1; } int sz = (int) xs_msg_size (msg_); - int rc = (((xs::socket_base_t*) s_)->send ((xs::msg_t*) msg_, flags_)); + s->lock (); + int rc = s->send ((xs::msg_t*) msg_, flags_); + s->unlock (); if (unlikely (rc < 0)) return -1; return sz; @@ -301,11 +318,14 @@ int xs_sendmsg (void *s_, xs_msg_t *msg_, int flags_) int xs_recvmsg (void *s_, xs_msg_t *msg_, int flags_) { - if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { + xs::socket_base_t *s = (xs::socket_base_t*) s_; + if (!s || !s->check_tag ()) { errno = ENOTSOCK; return -1; } - int rc = (((xs::socket_base_t*) s_)->recv ((xs::msg_t*) msg_, flags_)); + s->lock (); + int rc = s->recv ((xs::msg_t*) msg_, flags_); + s->unlock (); if (unlikely (rc < 0)) return -1; return (int) xs_msg_size (msg_); -- cgit v1.2.3