From 7c2dfc65b15f62751470778284b325734d0a241b Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 16 Feb 2012 10:09:36 +0900 Subject: Socket re-entrancy rewritten The inspiration for this re-write came form John Skaller's patch. Adding him to Credits section of the AUTHORS file. Signed-off-by: Martin Sustrik --- src/socket_base.cpp | 133 ++++++++++++---------------------------------------- 1 file changed, 31 insertions(+), 102 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fb387db..97ac4b4 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -117,7 +117,7 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_, xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : own_t (parent_, tid_), tag (0xbaddecaf), - sync (!parent_->is_reentrant ()), + reentrant (parent_->is_reentrant ()), ctx_terminated (false), destroyed (false), last_tsc (0), @@ -146,6 +146,18 @@ void xs::socket_base_t::stop () send_stop (); } +void xs::socket_base_t::lock () +{ + if (reentrant) + sync.lock (); +} + +void xs::socket_base_t::unlock () +{ + if (reentrant) + sync.unlock (); +} + int xs::socket_base_t::parse_uri (const char *uri_, std::string &protocol_, std::string &address_) { @@ -227,74 +239,58 @@ void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_) int xs::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { - sync.lock (); - if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } // First, check whether specific socket type overloads the option. int rc = xsetsockopt (option_, optval_, optvallen_); - if (rc == 0 || errno != EINVAL) { - sync.unlock (); + if (rc == 0 || errno != EINVAL) return rc; - } // If the socket type doesn't support the option, pass it to // the generic option parser. rc = options.setsockopt (option_, optval_, optvallen_); - sync.unlock (); return rc; } int xs::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { - sync.lock (); - if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } if (option_ == XS_RCVMORE) { if (*optvallen_ < sizeof (int)) { - sync.unlock (); errno = EINVAL; return -1; } *((int*) optval_) = rcvmore ? 1 : 0; *optvallen_ = sizeof (int); - sync.unlock (); return 0; } if (option_ == XS_FD) { if (*optvallen_ < sizeof (fd_t)) { errno = EINVAL; - sync.unlock (); return -1; } *((fd_t*) optval_) = mailbox.get_fd (); *optvallen_ = sizeof (fd_t); - sync.unlock (); return 0; } if (option_ == XS_EVENTS) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; - sync.unlock (); return -1; } int rc = process_commands (0, false); - if (rc != 0 && (errno == EINTR || errno == ETERM)) { - sync.unlock (); + if (rc != 0 && (errno == EINTR || errno == ETERM)) return -1; - } errno_assert (rc == 0); *((int*) optval_) = 0; if (has_out ()) @@ -302,7 +298,6 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, if (has_in ()) *((int*) optval_) |= XS_POLLIN; *optvallen_ = sizeof (int); - sync.unlock (); return 0; } @@ -311,10 +306,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, int xs::socket_base_t::bind (const char *addr_) { - sync.lock (); - if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } @@ -323,21 +315,16 @@ int xs::socket_base_t::bind (const char *addr_) std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); - if (rc != 0) { - sync.unlock (); + if (rc != 0) return -1; - } rc = check_protocol (protocol); - if (rc != 0) { - sync.unlock (); + if (rc != 0) return -1; - } if (protocol == "inproc") { endpoint_t endpoint = {this, options}; rc = register_endpoint (addr_, endpoint); - sync.unlock (); return rc; } @@ -346,7 +333,6 @@ int xs::socket_base_t::bind (const char *addr_) // For convenience's sake, bind can be used interchageable with // connect for PGM and EPGM transports. rc = connect (addr_); - sync.unlock (); return rc; } @@ -354,7 +340,6 @@ int xs::socket_base_t::bind (const char *addr_) // point we'll choose one. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { - sync.unlock (); errno = EMTHREAD; return -1; } @@ -366,11 +351,9 @@ int xs::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; - sync.unlock (); return -1; } launch_child (listener); - sync.unlock (); return 0; } @@ -382,11 +365,9 @@ int xs::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; - sync.unlock (); return -1; } launch_child (listener); - sync.unlock (); return 0; } #endif @@ -397,10 +378,7 @@ int xs::socket_base_t::bind (const char *addr_) int xs::socket_base_t::connect (const char *addr_) { - sync.lock (); - if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } @@ -409,16 +387,12 @@ int xs::socket_base_t::connect (const char *addr_) std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); - if (rc != 0) { - sync.unlock (); + if (rc != 0) return -1; - } rc = check_protocol (protocol); - if (rc != 0) { - sync.unlock (); + if (rc != 0) return -1; - } if (protocol == "inproc") { @@ -428,10 +402,8 @@ int xs::socket_base_t::connect (const char *addr_) // Find the peer endpoint. endpoint_t peer = find_endpoint (addr_); - if (!peer.socket) { - sync.unlock (); + if (!peer.socket) return -1; - } // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. @@ -473,14 +445,12 @@ int xs::socket_base_t::connect (const char *addr_) // increased here. send_bind (peer.socket, pipes [1], false); - sync.unlock (); return 0; } // Choose the I/O thread to run the session in. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { - sync.unlock (); errno = EMTHREAD; return -1; } @@ -513,34 +483,27 @@ int xs::socket_base_t::connect (const char *addr_) // Activate the session. Make it a child of this socket. launch_child (session); - sync.unlock (); return 0; } int xs::socket_base_t::send (msg_t *msg_, int flags_) { - sync.lock (); - // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { - sync.unlock (); errno = EFAULT; return -1; } // Process pending commands, if any. int rc = process_commands (0, true); - if (unlikely (rc != 0)) { - sync.unlock (); + if (unlikely (rc != 0)) return -1; - } // Clear any user-visible flags that are set on the message. msg_->reset_flags (msg_t::more); @@ -551,21 +514,15 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // Try to send the message. rc = xsend (msg_, flags_); - if (rc == 0) { - sync.unlock (); + if (rc == 0) return 0; - } - if (unlikely (errno != EAGAIN)) { - sync.unlock (); + if (unlikely (errno != EAGAIN)) return -1; - } // In case of non-blocking send we'll simply propagate // the error - including EAGAIN - up the stack. - if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) { - sync.unlock (); + if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) return -1; - } // Compute the time when the timeout should occur. // If the timeout is infite, don't care. @@ -577,55 +534,43 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // command, process it and try to send the message again. // If timeout is reached in the meantime, return EAGAIN. while (true) { - if (unlikely (process_commands (timeout, false) != 0)) { - sync.unlock (); + if (unlikely (process_commands (timeout, false) != 0)) return -1; - } rc = xsend (msg_, flags_); if (rc == 0) break; - if (unlikely (errno != EAGAIN)) { - sync.unlock (); + if (unlikely (errno != EAGAIN)) return -1; - } if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { - sync.unlock (); errno = EAGAIN; return -1; } } } - sync.unlock (); return 0; } int xs::socket_base_t::recv (msg_t *msg_, int flags_) { - sync.lock (); - // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { - sync.unlock (); errno = ETERM; return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { - sync.unlock (); errno = EFAULT; return -1; } // Get the message. int rc = xrecv (msg_, flags_); - if (unlikely (rc != 0 && errno != EAGAIN)) { - sync.unlock (); + if (unlikely (rc != 0 && errno != EAGAIN)) return -1; - } // Once every inbound_poll_rate messages check for signals and process // incoming commands. This happens only if we are not polling altogether @@ -644,7 +589,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // If we have the message, return immediately. if (rc == 0) { extract_flags (msg_); - sync.unlock (); return 0; } @@ -653,20 +597,15 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // activate_reader command already waiting int a command pipe. // If it's not, return EAGAIN. if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) { - if (unlikely (process_commands (0, false) != 0)) { - sync.unlock (); + if (unlikely (process_commands (0, false) != 0)) return -1; - } ticks = 0; rc = xrecv (msg_, flags_); - if (rc < 0) { - sync.unlock (); + if (rc < 0) return rc; - } extract_flags (msg_); - sync.unlock (); return 0; } @@ -680,24 +619,19 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // we are able to fetch a message. bool block = (ticks != 0); while (true) { - if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { - sync.unlock (); + if (unlikely (process_commands (block ? timeout : 0, false) != 0)) return -1; - } rc = xrecv (msg_, flags_); if (rc == 0) { ticks = 0; break; } - if (unlikely (errno != EAGAIN)) { - sync.unlock (); + if (unlikely (errno != EAGAIN)) return -1; - } block = true; if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { - sync.unlock (); errno = EAGAIN; return -1; } @@ -705,7 +639,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) } extract_flags (msg_); - sync.unlock (); return 0; } @@ -724,17 +657,13 @@ int xs::socket_base_t::close () bool xs::socket_base_t::has_in () { - sync.lock (); bool ret = xhas_in (); - sync.unlock (); return ret; } bool xs::socket_base_t::has_out () { - sync.lock (); bool ret = xhas_out (); - sync.unlock (); return ret; } -- cgit v1.2.3