diff options
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | doc/xs_setctxopt.txt | 16 | ||||
| -rw-r--r-- | include/xs.h | 1 | ||||
| -rw-r--r-- | src/ctx.cpp | 21 | ||||
| -rw-r--r-- | src/ctx.hpp | 6 | ||||
| -rw-r--r-- | src/mutex.hpp | 52 | ||||
| -rw-r--r-- | src/socket_base.cpp | 140 | ||||
| -rw-r--r-- | src/socket_base.hpp | 4 | ||||
| -rw-r--r-- | tests/Makefile.am | 4 | ||||
| -rw-r--r-- | tests/reentrant.cpp | 53 | 
10 files changed, 255 insertions, 43 deletions
@@ -37,6 +37,7 @@ tests/msg_flags  tests/reconnect  tests/linger  tests/max_sockets +tests/reentrant  src/platform.hpp*  src/stamp-h1  perf/local_lat diff --git a/doc/xs_setctxopt.txt b/doc/xs_setctxopt.txt index be9e654..bc7d018 100644 --- a/doc/xs_setctxopt.txt +++ b/doc/xs_setctxopt.txt @@ -34,6 +34,22 @@ Option value type:: int  Option value unit:: sockets  Default value:: 512 +XS_CTX_REENTRANT: Specify whether sockets should be thread-safe +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +If 'XS_CTX_REENTRANT' option is set to 1 it is safe to access single Crossroads +socket from multiple threads in parallel. If it is set to 0 it can be accessed +by at most one thread at any single point of time. + +Note: By default Crossroads sockets are non-reentrant. If possible, try to use +the socket from the thread it was created in. If communication between threads +is needed use inproc transport. Not following this advice can introduce +scalability problems. + +[horizontal] +Option value type:: int +Option value unit:: boolean +Default value:: 0 +  RETURN VALUE  ------------  The _xs_setctxopt()_ function shall return zero if successful. Otherwise it diff --git a/include/xs.h b/include/xs.h index 3aa1045..4d0be57 100644 --- a/include/xs.h +++ b/include/xs.h @@ -147,6 +147,7 @@ XS_EXPORT int xs_getmsgopt (xs_msg_t *msg, int option, void *optval,  /******************************************************************************/  #define XS_CTX_MAX_SOCKETS 1 +#define XS_CTX_REENTRANT 2  XS_EXPORT void *xs_init (int io_threads);  XS_EXPORT int xs_term (void *context); diff --git a/src/ctx.cpp b/src/ctx.cpp index b6298e2..917c3e6 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -43,7 +43,8 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) :      starting (true),      terminating (false),      max_sockets (512), -    io_thread_count (io_threads_) +    io_thread_count (io_threads_), +    reentrant (false)  {  } @@ -147,6 +148,16 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)          max_sockets = *((int*) optval_);          opt_sync.unlock ();          break; +    case XS_CTX_REENTRANT: +        if (optvallen_ != sizeof (int) || (*((int*) optval_) != 0 && +              *((int*) optval_) != 1)) { +            errno = EINVAL; +            return -1; +        } +        opt_sync.lock (); +        reentrant = (*((int*) optval_) ? true : false); +        opt_sync.unlock (); +        break;      default:          errno = EINVAL;          return -1; @@ -379,6 +390,14 @@ void xs::ctx_t::publish_logs (const char *text_)      log_sync.unlock ();  } +bool xs::ctx_t::is_reentrant () +{ +    opt_sync.lock (); +    bool ret = reentrant; +    opt_sync.unlock (); +    return ret; +} +  //  The last used socket ID, or 0 if no socket was used so far. Note that this  //  is a global variable. Thus, even sockets created in different contexts have  //  unique IDs. diff --git a/src/ctx.hpp b/src/ctx.hpp index 56b5d4c..e912443 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -100,6 +100,9 @@ namespace xs          void log (int sid_, const char *text_);          void publish_logs (const char *text_); +        //  True, if API is expected to be reentrant. +        bool is_reentrant (); +          enum {              term_tid = 0,              reaper_tid = 1 @@ -173,6 +176,9 @@ namespace xs          //  Number of I/O threads to launch.          uint32_t io_thread_count; +        //  True, if API is expected to be reentrant. +        bool reentrant; +          //  Synchronisation of access to context options.          mutex_t opt_sync; diff --git a/src/mutex.hpp b/src/mutex.hpp index 118b5ef..fc75bed 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -37,28 +37,34 @@ namespace xs      class mutex_t      {      public: -        inline mutex_t () +        inline mutex_t (bool fake_ = false) : +            fake (fake_)          { -            InitializeCriticalSection (&cs); +            if (!fake) +                InitializeCriticalSection (&cs);          }          inline ~mutex_t ()          { -            DeleteCriticalSection (&cs); +            if (!fake) +                DeleteCriticalSection (&cs);          }          inline void lock ()          { -            EnterCriticalSection (&cs); +            if (!fake) +                EnterCriticalSection (&cs);          }          inline void unlock ()          { -            LeaveCriticalSection (&cs); +            if (!fake) +                LeaveCriticalSection (&cs);          }      private: +        bool fake;          CRITICAL_SECTION cs;          //  Disable copy construction and assignment. @@ -78,36 +84,46 @@ namespace xs      class mutex_t      {      public: -        inline mutex_t () +        inline mutex_t (bool fake_ = false) : +            fake (fake_)          { -            int rc = pthread_mutex_init (&mutex, NULL); -            if (rc) -                posix_assert (rc); +            if (!fake) { +                int rc = pthread_mutex_init (&mutex, NULL); +                if (rc) +                    posix_assert (rc); +            }          }          inline ~mutex_t ()          { -            int rc = pthread_mutex_destroy (&mutex); -            if (rc) -                posix_assert (rc); +            if (!fake) { +                int rc = pthread_mutex_destroy (&mutex); +                if (rc) +                    posix_assert (rc); +            }          }          inline void lock ()          { -            int rc = pthread_mutex_lock (&mutex); -            if (rc) -                posix_assert (rc); +            if (!fake) { +                int rc = pthread_mutex_lock (&mutex); +                if (rc) +                    posix_assert (rc); +            }          }          inline void unlock ()          { -            int rc = pthread_mutex_unlock (&mutex); -            if (rc) -                posix_assert (rc); +            if (!fake) { +                int rc = pthread_mutex_unlock (&mutex); +                if (rc) +                    posix_assert (rc); +            }          }      private: +        bool fake;          pthread_mutex_t mutex;          // Disable copy construction and assignment. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index f3ae291..718263f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -117,6 +117,7 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,  xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :      own_t (parent_, tid_),      tag (0xbaddecaf), +    sync (!parent_->is_reentrant ()),      ctx_terminated (false),      destroyed (false),      last_tsc (0), @@ -226,57 +227,74 @@ void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_)  int xs::socket_base_t::setsockopt (int option_, const void *optval_,      size_t optvallen_)  { +    sync.lock (); +      if (unlikely (ctx_terminated)) { +        sync.unlock ();          errno = ETERM;          return -1;      }      //  First, check whether specific socket type overloads the option.      int rc = xsetsockopt (option_, optval_, optvallen_); -    if (rc == 0 || errno != EINVAL) +    if (rc == 0 || errno != EINVAL) { +        sync.unlock ();          return rc; +    }      //  If the socket type doesn't support the option, pass it to      //  the generic option parser. -    return options.setsockopt (option_, optval_, optvallen_); +    rc = options.setsockopt (option_, optval_, optvallen_); +    sync.unlock (); +    return rc;  }  int xs::socket_base_t::getsockopt (int option_, void *optval_,      size_t *optvallen_)  { +    sync.lock (); +      if (unlikely (ctx_terminated)) { +        sync.unlock ();          errno = ETERM;          return -1;      }      if (option_ == XS_RCVMORE) {          if (*optvallen_ < sizeof (int)) { +            sync.unlock ();              errno = EINVAL;              return -1;          }          *((int*) optval_) = rcvmore ? 1 : 0;          *optvallen_ = sizeof (int); +        sync.unlock ();          return 0;      }      if (option_ == XS_FD) {          if (*optvallen_ < sizeof (fd_t)) {              errno = EINVAL; +            sync.unlock ();              return -1;          }          *((fd_t*) optval_) = mailbox.get_fd ();          *optvallen_ = sizeof (fd_t); +        sync.unlock ();          return 0;      }      if (option_ == XS_EVENTS) {          if (*optvallen_ < sizeof (int)) {              errno = EINVAL; +            sync.unlock ();              return -1;          }          int rc = process_commands (0, false); -        if (rc != 0 && (errno == EINTR || errno == ETERM)) +        if (rc != 0 && (errno == EINTR || errno == ETERM)) { +            sync.unlock ();              return -1; +        }          errno_assert (rc == 0);          *((int*) optval_) = 0;          if (has_out ()) @@ -284,6 +302,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,          if (has_in ())              *((int*) optval_) |= XS_POLLIN;          *optvallen_ = sizeof (int); +        sync.unlock ();          return 0;      } @@ -292,7 +311,10 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,  int xs::socket_base_t::bind (const char *addr_)  { +    sync.lock (); +      if (unlikely (ctx_terminated)) { +        sync.unlock ();          errno = ETERM;          return -1;      } @@ -301,29 +323,38 @@ int xs::socket_base_t::bind (const char *addr_)      std::string protocol;      std::string address;      int rc = parse_uri (addr_, protocol, address); -    if (rc != 0) +    if (rc != 0) { +        sync.unlock ();          return -1; +    }      rc = check_protocol (protocol); -    if (rc != 0) +    if (rc != 0) { +        sync.unlock ();          return -1; +    }      if (protocol == "inproc") {          endpoint_t endpoint = {this, options}; -        return register_endpoint (addr_, endpoint); +        rc = register_endpoint (addr_, endpoint); +        sync.unlock (); +        return rc;      }      if (protocol == "pgm" || protocol == "epgm") {          //  For convenience's sake, bind can be used interchageable with          //  connect for PGM and EPGM transports. -        return connect (addr_);  +        rc = connect (addr_); +        sync.unlock (); +        return rc;      }      //  Remaining trasnports require to be run in an I/O thread, so at this      //  point we'll choose one.      io_thread_t *io_thread = choose_io_thread (options.affinity);      if (!io_thread) { +        sync.unlock ();          errno = EMTHREAD;          return -1;      } @@ -335,9 +366,11 @@ int xs::socket_base_t::bind (const char *addr_)          int rc = listener->set_address (address.c_str ());          if (rc != 0) {              delete listener; +            sync.unlock ();              return -1;          }          launch_child (listener); +        sync.unlock ();          return 0;      } @@ -349,9 +382,11 @@ int xs::socket_base_t::bind (const char *addr_)          int rc = listener->set_address (address.c_str ());          if (rc != 0) {              delete listener; +            sync.unlock ();              return -1;          }          launch_child (listener); +        sync.unlock ();          return 0;      }  #endif @@ -362,7 +397,10 @@ int xs::socket_base_t::bind (const char *addr_)  int xs::socket_base_t::connect (const char *addr_)  { +    sync.lock (); +      if (unlikely (ctx_terminated)) { +        sync.unlock ();          errno = ETERM;          return -1;      } @@ -371,12 +409,16 @@ int xs::socket_base_t::connect (const char *addr_)      std::string protocol;      std::string address;      int rc = parse_uri (addr_, protocol, address); -    if (rc != 0) +    if (rc != 0) { +        sync.unlock ();          return -1; +    }      rc = check_protocol (protocol); -    if (rc != 0) +    if (rc != 0) { +        sync.unlock ();          return -1; +    }      if (protocol == "inproc") { @@ -386,8 +428,10 @@ int xs::socket_base_t::connect (const char *addr_)          //  Find the peer endpoint.          endpoint_t peer = find_endpoint (addr_); -        if (!peer.socket) +        if (!peer.socket) { +            sync.unlock ();              return -1; +        }          // The total HWM for an inproc connection should be the sum of          // the binder's HWM and the connector's HWM. @@ -429,12 +473,14 @@ int xs::socket_base_t::connect (const char *addr_)          //  increased here.          send_bind (peer.socket, pipes [1], false); +        sync.unlock ();          return 0;      }      //  Choose the I/O thread to run the session in.      io_thread_t *io_thread = choose_io_thread (options.affinity);      if (!io_thread) { +        sync.unlock ();          errno = EMTHREAD;          return -1;      } @@ -467,27 +513,34 @@ int xs::socket_base_t::connect (const char *addr_)      //  Activate the session. Make it a child of this socket.      launch_child (session); +    sync.unlock ();      return 0;  }  int xs::socket_base_t::send (msg_t *msg_, int flags_)  { +    sync.lock (); +      //  Check whether the library haven't been shut down yet.      if (unlikely (ctx_terminated)) { +        sync.unlock ();          errno = ETERM;          return -1;      }      //  Check whether message passed to the function is valid.      if (unlikely (!msg_ || !msg_->check ())) { +        sync.unlock ();          errno = EFAULT;          return -1;      }      //  Process pending commands, if any.      int rc = process_commands (0, true); -    if (unlikely (rc != 0)) +    if (unlikely (rc != 0)) { +        sync.unlock ();          return -1; +    }      //  Clear any user-visible flags that are set on the message.      msg_->reset_flags (msg_t::more); @@ -498,15 +551,21 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)      //  Try to send the message.      rc = xsend (msg_, flags_); -    if (rc == 0) +    if (rc == 0) { +        sync.unlock ();          return 0; -    if (unlikely (errno != EAGAIN)) +    } +    if (unlikely (errno != EAGAIN)) { +        sync.unlock ();          return -1; +    }      //  In case of non-blocking send we'll simply propagate      //  the error - including EAGAIN - up the stack. -    if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) +    if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) { +        sync.unlock ();          return -1; +    }      //  Compute the time when the timeout should occur.      //  If the timeout is infite, don't care.  @@ -518,42 +577,55 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)      //  command, process it and try to send the message again.      //  If timeout is reached in the meantime, return EAGAIN.      while (true) { -        if (unlikely (process_commands (timeout, false) != 0)) +        if (unlikely (process_commands (timeout, false) != 0)) { +            sync.unlock ();              return -1; +        }          rc = xsend (msg_, flags_);          if (rc == 0)              break; -        if (unlikely (errno != EAGAIN)) +        if (unlikely (errno != EAGAIN)) { +            sync.unlock ();              return -1; +        }          if (timeout > 0) {              timeout = (int) (end - clock.now_ms ());              if (timeout <= 0) { +                sync.unlock ();                  errno = EAGAIN;                  return -1;              }          }      } + +    sync.unlock ();      return 0;  }  int xs::socket_base_t::recv (msg_t *msg_, int flags_)  { +    sync.lock (); +      //  Check whether the library haven't been shut down yet.      if (unlikely (ctx_terminated)) { +        sync.unlock ();          errno = ETERM;          return -1;      }      //  Check whether message passed to the function is valid.      if (unlikely (!msg_ || !msg_->check ())) { +        sync.unlock ();          errno = EFAULT;          return -1;      }      //  Get the message.      int rc = xrecv (msg_, flags_); -    if (unlikely (rc != 0 && errno != EAGAIN)) +    if (unlikely (rc != 0 && errno != EAGAIN)) { +        sync.unlock ();          return -1; +    }      //  Once every inbound_poll_rate messages check for signals and process      //  incoming commands. This happens only if we are not polling altogether @@ -572,6 +644,7 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)      //  If we have the message, return immediately.      if (rc == 0) {          extract_flags (msg_); +        sync.unlock ();          return 0;      } @@ -580,14 +653,20 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)      //  activate_reader command already waiting int a command pipe.      //  If it's not, return EAGAIN.      if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) { -        if (unlikely (process_commands (0, false) != 0)) +        if (unlikely (process_commands (0, false) != 0)) { +            sync.unlock ();              return -1; +        }          ticks = 0;          rc = xrecv (msg_, flags_); -        if (rc < 0) +        if (rc < 0) { +            sync.unlock ();              return rc; +        } +          extract_flags (msg_); +        sync.unlock ();          return 0;      } @@ -601,19 +680,24 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)      //  we are able to fetch a message.      bool block = (ticks != 0);      while (true) { -        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) +        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { +            sync.unlock ();              return -1; +        }          rc = xrecv (msg_, flags_);          if (rc == 0) {              ticks = 0;              break;          } -        if (unlikely (errno != EAGAIN)) +        if (unlikely (errno != EAGAIN)) { +            sync.unlock ();              return -1; +        }          block = true;          if (timeout > 0) {              timeout = (int) (end - clock.now_ms ());              if (timeout <= 0) { +                sync.unlock ();                  errno = EAGAIN;                  return -1;              } @@ -621,11 +705,14 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)      }      extract_flags (msg_); +    sync.unlock ();      return 0;  }  int xs::socket_base_t::close ()  { +    sync.lock (); +      //  Mark the socket as dead.      tag = 0xdeadbeef; @@ -634,17 +721,24 @@ int xs::socket_base_t::close ()      //  process.      send_reap (this); +    sync.unlock ();      return 0;  }  bool xs::socket_base_t::has_in ()  { -    return xhas_in (); +    sync.lock (); +    bool ret = xhas_in (); +    sync.unlock (); +    return ret;  }  bool xs::socket_base_t::has_out ()  { -    return xhas_out (); +    sync.lock (); +    bool ret = xhas_out (); +    sync.unlock (); +    return ret;  }  void xs::socket_base_t::start_reaping (poller_base_t *poller_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1e35ffa..850586e 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -141,6 +141,10 @@ namespace xs          //  Used to check whether the object is a socket.          uint32_t tag; +        //  Synchronisation of access to the socket. If Crossroads are running +        //  in non-reentrant mode, it is a dummy mutex-like object. +        mutex_t sync; +          //  If true, associated context was already terminated.          bool ctx_terminated; diff --git a/tests/Makefile.am b/tests/Makefile.am index fba6271..78bac57 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -18,7 +18,8 @@ noinst_PROGRAMS = pair_inproc \                    pair_ipc \                    reqrep_ipc \                    timeo \ -                  max_sockets +                  max_sockets \ +                  reentrant  pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp  pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp @@ -36,5 +37,6 @@ pair_ipc_SOURCES = pair_ipc.cpp testutil.hpp  reqrep_ipc_SOURCES = reqrep_ipc.cpp testutil.hpp  timeo_SOURCES = timeo.cpp  max_sockets_SOURCES = max_sockets.cpp +reentrant_SOURCES = reentrant.cpp testutil.hpp  TESTS = $(noinst_PROGRAMS) diff --git a/tests/reentrant.cpp b/tests/reentrant.cpp new file mode 100644 index 0000000..a4b2760 --- /dev/null +++ b/tests/reentrant.cpp @@ -0,0 +1,53 @@ +/* +    Copyright (c) 2012 250bpm s.r.o. +    Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +    This file is part of Crossroads project. + +    Crossroads is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    Crossroads is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "testutil.hpp" + +int XS_TEST_MAIN () +{ +    fprintf (stderr, "reentrant test running...\n"); + +    //  Initialise the context and set REENTRANT option. +    void *ctx = xs_init (1); +    assert (ctx); +    int val = 1; +    int rc = xs_setctxopt (ctx, XS_CTX_REENTRANT, &val, sizeof (val)); +    assert (rc == 0); + +    //  Do a set of operations to make sure that REENTRANT option doesn't +    //  break anything. +    void *sb = xs_socket (ctx, XS_REP); +    assert (sb); +    rc = xs_bind (sb, "inproc://a"); +    assert (rc == 0); +    void *sc = xs_socket (ctx, XS_REQ); +    assert (sc); +    rc = xs_connect (sc, "inproc://a"); +    assert (rc == 0); +    bounce (sb, sc); +    rc = xs_close (sc); +    assert (rc == 0); +    rc = xs_close (sb); +    assert (rc == 0); +    rc = xs_term (ctx); +    assert (rc == 0); + +    return 0 ; +}  | 
