From 1e01248efc113cc9389f795157400a634730823e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 16 Feb 2012 10:08:43 +0900 Subject: XS_CTX_REENTRANT option added Signed-off-by: Martin Sustrik --- src/socket_base.cpp | 140 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 117 insertions(+), 23 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index f3ae291..718263f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -117,6 +117,7 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_, xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : own_t (parent_, tid_), tag (0xbaddecaf), + sync (!parent_->is_reentrant ()), ctx_terminated (false), destroyed (false), last_tsc (0), @@ -226,57 +227,74 @@ void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_) int xs::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } // First, check whether specific socket type overloads the option. int rc = xsetsockopt (option_, optval_, optvallen_); - if (rc == 0 || errno != EINVAL) + if (rc == 0 || errno != EINVAL) { + sync.unlock (); return rc; + } // If the socket type doesn't support the option, pass it to // the generic option parser. - return options.setsockopt (option_, optval_, optvallen_); + rc = options.setsockopt (option_, optval_, optvallen_); + sync.unlock (); + return rc; } int xs::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } if (option_ == XS_RCVMORE) { if (*optvallen_ < sizeof (int)) { + sync.unlock (); errno = EINVAL; return -1; } *((int*) optval_) = rcvmore ? 1 : 0; *optvallen_ = sizeof (int); + sync.unlock (); return 0; } if (option_ == XS_FD) { if (*optvallen_ < sizeof (fd_t)) { errno = EINVAL; + sync.unlock (); return -1; } *((fd_t*) optval_) = mailbox.get_fd (); *optvallen_ = sizeof (fd_t); + sync.unlock (); return 0; } if (option_ == XS_EVENTS) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; + sync.unlock (); return -1; } int rc = process_commands (0, false); - if (rc != 0 && (errno == EINTR || errno == ETERM)) + if (rc != 0 && (errno == EINTR || errno == ETERM)) { + sync.unlock (); return -1; + } errno_assert (rc == 0); *((int*) optval_) = 0; if (has_out ()) @@ -284,6 +302,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, if (has_in ()) *((int*) optval_) |= XS_POLLIN; *optvallen_ = sizeof (int); + sync.unlock (); return 0; } @@ -292,7 +311,10 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, int xs::socket_base_t::bind (const char *addr_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } @@ -301,29 +323,38 @@ int xs::socket_base_t::bind (const char *addr_) std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } rc = check_protocol (protocol); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } if (protocol == "inproc") { endpoint_t endpoint = {this, options}; - return register_endpoint (addr_, endpoint); + rc = register_endpoint (addr_, endpoint); + sync.unlock (); + return rc; } if (protocol == "pgm" || protocol == "epgm") { // For convenience's sake, bind can be used interchageable with // connect for PGM and EPGM transports. - return connect (addr_); + rc = connect (addr_); + sync.unlock (); + return rc; } // Remaining trasnports require to be run in an I/O thread, so at this // point we'll choose one. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { + sync.unlock (); errno = EMTHREAD; return -1; } @@ -335,9 +366,11 @@ int xs::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; + sync.unlock (); return -1; } launch_child (listener); + sync.unlock (); return 0; } @@ -349,9 +382,11 @@ int xs::socket_base_t::bind (const char *addr_) int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; + sync.unlock (); return -1; } launch_child (listener); + sync.unlock (); return 0; } #endif @@ -362,7 +397,10 @@ int xs::socket_base_t::bind (const char *addr_) int xs::socket_base_t::connect (const char *addr_) { + sync.lock (); + if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } @@ -371,12 +409,16 @@ int xs::socket_base_t::connect (const char *addr_) std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } rc = check_protocol (protocol); - if (rc != 0) + if (rc != 0) { + sync.unlock (); return -1; + } if (protocol == "inproc") { @@ -386,8 +428,10 @@ int xs::socket_base_t::connect (const char *addr_) // Find the peer endpoint. endpoint_t peer = find_endpoint (addr_); - if (!peer.socket) + if (!peer.socket) { + sync.unlock (); return -1; + } // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. @@ -429,12 +473,14 @@ int xs::socket_base_t::connect (const char *addr_) // increased here. send_bind (peer.socket, pipes [1], false); + sync.unlock (); return 0; } // Choose the I/O thread to run the session in. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { + sync.unlock (); errno = EMTHREAD; return -1; } @@ -467,27 +513,34 @@ int xs::socket_base_t::connect (const char *addr_) // Activate the session. Make it a child of this socket. launch_child (session); + sync.unlock (); return 0; } int xs::socket_base_t::send (msg_t *msg_, int flags_) { + sync.lock (); + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { + sync.unlock (); errno = EFAULT; return -1; } // Process pending commands, if any. int rc = process_commands (0, true); - if (unlikely (rc != 0)) + if (unlikely (rc != 0)) { + sync.unlock (); return -1; + } // Clear any user-visible flags that are set on the message. msg_->reset_flags (msg_t::more); @@ -498,15 +551,21 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // Try to send the message. rc = xsend (msg_, flags_); - if (rc == 0) + if (rc == 0) { + sync.unlock (); return 0; - if (unlikely (errno != EAGAIN)) + } + if (unlikely (errno != EAGAIN)) { + sync.unlock (); return -1; + } // In case of non-blocking send we'll simply propagate // the error - including EAGAIN - up the stack. - if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) + if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) { + sync.unlock (); return -1; + } // Compute the time when the timeout should occur. // If the timeout is infite, don't care. @@ -518,42 +577,55 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) // command, process it and try to send the message again. // If timeout is reached in the meantime, return EAGAIN. while (true) { - if (unlikely (process_commands (timeout, false) != 0)) + if (unlikely (process_commands (timeout, false) != 0)) { + sync.unlock (); return -1; + } rc = xsend (msg_, flags_); if (rc == 0) break; - if (unlikely (errno != EAGAIN)) + if (unlikely (errno != EAGAIN)) { + sync.unlock (); return -1; + } if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { + sync.unlock (); errno = EAGAIN; return -1; } } } + + sync.unlock (); return 0; } int xs::socket_base_t::recv (msg_t *msg_, int flags_) { + sync.lock (); + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { + sync.unlock (); errno = ETERM; return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { + sync.unlock (); errno = EFAULT; return -1; } // Get the message. int rc = xrecv (msg_, flags_); - if (unlikely (rc != 0 && errno != EAGAIN)) + if (unlikely (rc != 0 && errno != EAGAIN)) { + sync.unlock (); return -1; + } // Once every inbound_poll_rate messages check for signals and process // incoming commands. This happens only if we are not polling altogether @@ -572,6 +644,7 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // If we have the message, return immediately. if (rc == 0) { extract_flags (msg_); + sync.unlock (); return 0; } @@ -580,14 +653,20 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // activate_reader command already waiting int a command pipe. // If it's not, return EAGAIN. if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) { - if (unlikely (process_commands (0, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) { + sync.unlock (); return -1; + } ticks = 0; rc = xrecv (msg_, flags_); - if (rc < 0) + if (rc < 0) { + sync.unlock (); return rc; + } + extract_flags (msg_); + sync.unlock (); return 0; } @@ -601,19 +680,24 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) // we are able to fetch a message. bool block = (ticks != 0); while (true) { - if (unlikely (process_commands (block ? timeout : 0, false) != 0)) + if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { + sync.unlock (); return -1; + } rc = xrecv (msg_, flags_); if (rc == 0) { ticks = 0; break; } - if (unlikely (errno != EAGAIN)) + if (unlikely (errno != EAGAIN)) { + sync.unlock (); return -1; + } block = true; if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { + sync.unlock (); errno = EAGAIN; return -1; } @@ -621,11 +705,14 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_) } extract_flags (msg_); + sync.unlock (); return 0; } int xs::socket_base_t::close () { + sync.lock (); + // Mark the socket as dead. tag = 0xdeadbeef; @@ -634,17 +721,24 @@ int xs::socket_base_t::close () // process. send_reap (this); + sync.unlock (); return 0; } bool xs::socket_base_t::has_in () { - return xhas_in (); + sync.lock (); + bool ret = xhas_in (); + sync.unlock (); + return ret; } bool xs::socket_base_t::has_out () { - return xhas_out (); + sync.lock (); + bool ret = xhas_out (); + sync.unlock (); + return ret; } void xs::socket_base_t::start_reaping (poller_base_t *poller_) -- cgit v1.2.3