diff options
| -rw-r--r-- | AUTHORS | 1 | ||||
| -rw-r--r-- | src/mutex.hpp | 53 | ||||
| -rw-r--r-- | src/socket_base.cpp | 133 | ||||
| -rw-r--r-- | src/socket_base.hpp | 5 | ||||
| -rw-r--r-- | src/xs.cpp | 48 | 
5 files changed, 90 insertions, 150 deletions
| @@ -97,6 +97,7 @@ Evgueny Khartchenko <Evgueny.Khartchenko@intel.com>  Frank Vanden Berghen <frank@applied-mathematics.net>  Ian Barber <ian.barber@gmail.com>  John Apps <john.apps@hp.com> +John Skaller <skaller@users.sourceforge.net>  Markus Fischer <markus.fischer@hpc-solutions.com>  Matt Muggeridge <Matt.Muggeridge@hp.com>  Michael Santy <Michael.Santy@dynetics.com> diff --git a/src/mutex.hpp b/src/mutex.hpp index fc75bed..c0a2cc9 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -37,34 +37,28 @@ namespace xs      class mutex_t      {      public: -        inline mutex_t (bool fake_ = false) : -            fake (fake_) +        inline mutex_t ()          { -            if (!fake) -                InitializeCriticalSection (&cs); +            InitializeCriticalSection (&cs);          }          inline ~mutex_t ()          { -            if (!fake) -                DeleteCriticalSection (&cs); +            DeleteCriticalSection (&cs);          }          inline void lock ()          { -            if (!fake) -                EnterCriticalSection (&cs); +            EnterCriticalSection (&cs);          }          inline void unlock ()          { -            if (!fake) -                LeaveCriticalSection (&cs); +            LeaveCriticalSection (&cs);          }      private: -        bool fake;          CRITICAL_SECTION cs;          //  Disable copy construction and assignment. @@ -84,46 +78,36 @@ namespace xs      class mutex_t      {      public: -        inline mutex_t (bool fake_ = false) : -            fake (fake_) +        inline mutex_t ()          { -            if (!fake) { -                int rc = pthread_mutex_init (&mutex, NULL); -                if (rc) -                    posix_assert (rc); -            } +            int rc = pthread_mutex_init (&mutex, NULL); +            if (rc) +                posix_assert (rc);          }          inline ~mutex_t ()          { -            if (!fake) { -                int rc = pthread_mutex_destroy (&mutex); -                if (rc) -                    posix_assert (rc); -            } +            int rc = pthread_mutex_destroy (&mutex); +            if (rc) +                posix_assert (rc);          }          inline void lock ()          { -            if (!fake) { -                int rc = pthread_mutex_lock (&mutex); -                if (rc) -                    posix_assert (rc); -            } +            int rc = pthread_mutex_lock (&mutex); +            if (rc) +                posix_assert (rc);          }          inline void unlock ()          { -            if (!fake) { -                int rc = pthread_mutex_unlock (&mutex); -                if (rc) -                    posix_assert (rc); -            } +            int rc = pthread_mutex_unlock (&mutex); +            if (rc) +                posix_assert (rc);          }      private: -        bool fake;          pthread_mutex_t mutex;          // Disable copy construction and assignment. @@ -136,3 +120,4 @@ namespace xs  #endif  #endif + diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fb387db..97ac4b4 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -117,7 +117,7 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,  xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :      own_t (parent_, tid_),      tag (0xbaddecaf), -    sync (!parent_->is_reentrant ()), +    reentrant (parent_->is_reentrant ()),      ctx_terminated (false),      destroyed (false),      last_tsc (0), @@ -146,6 +146,18 @@ void xs::socket_base_t::stop ()      send_stop ();  } +void xs::socket_base_t::lock () +{ +    if (reentrant) +        sync.lock (); +} + +void xs::socket_base_t::unlock () +{ +    if (reentrant) +        sync.unlock (); +} +  int xs::socket_base_t::parse_uri (const char *uri_,                          std::string &protocol_, std::string &address_)  { @@ -227,74 +239,58 @@ void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_)  int xs::socket_base_t::setsockopt (int option_, const void *optval_,      size_t optvallen_)  { -    sync.lock (); -      if (unlikely (ctx_terminated)) { -        sync.unlock ();          errno = ETERM;          return -1;      }      //  First, check whether specific socket type overloads the option.      int rc = xsetsockopt (option_, optval_, optvallen_); -    if (rc == 0 || errno != EINVAL) { -        sync.unlock (); +    if (rc == 0 || errno != EINVAL)          return rc; -    }      //  If the socket type doesn't support the option, pass it to      //  the generic option parser.      rc = options.setsockopt (option_, optval_, optvallen_); -    sync.unlock ();      return rc;  }  int xs::socket_base_t::getsockopt (int option_, void *optval_,      size_t *optvallen_)  { -    sync.lock (); -      if (unlikely (ctx_terminated)) { -        sync.unlock ();          errno = ETERM;          return -1;      }      if (option_ == XS_RCVMORE) {          if (*optvallen_ < sizeof (int)) { -            sync.unlock ();              errno = EINVAL;              return -1;          }          *((int*) optval_) = rcvmore ? 1 : 0;          *optvallen_ = sizeof (int); -        sync.unlock ();          return 0;      }      if (option_ == XS_FD) {          if (*optvallen_ < sizeof (fd_t)) {              errno = EINVAL; -            sync.unlock ();              return -1;          }          *((fd_t*) optval_) = mailbox.get_fd ();          *optvallen_ = sizeof (fd_t); -        sync.unlock ();          return 0;      }      if (option_ == XS_EVENTS) {          if (*optvallen_ < sizeof (int)) {              errno = EINVAL; -            sync.unlock ();              return -1;          }          int rc = process_commands (0, false); -        if (rc != 0 && (errno == EINTR || errno == ETERM)) { -            sync.unlock (); +        if (rc != 0 && (errno == EINTR || errno == ETERM))              return -1; -        }          errno_assert (rc == 0);          *((int*) optval_) = 0;          if (has_out ()) @@ -302,7 +298,6 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,          if (has_in ())              *((int*) optval_) |= XS_POLLIN;          *optvallen_ = sizeof (int); -        sync.unlock ();          return 0;      } @@ -311,10 +306,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,  int xs::socket_base_t::bind (const char *addr_)  { -    sync.lock (); -      if (unlikely (ctx_terminated)) { -        sync.unlock ();          errno = ETERM;          return -1;      } @@ -323,21 +315,16 @@ int xs::socket_base_t::bind (const char *addr_)      std::string protocol;      std::string address;      int rc = parse_uri (addr_, protocol, address); -    if (rc != 0) { -        sync.unlock (); +    if (rc != 0)          return -1; -    }      rc = check_protocol (protocol); -    if (rc != 0) { -        sync.unlock (); +    if (rc != 0)          return -1; -    }      if (protocol == "inproc") {          endpoint_t endpoint = {this, options};          rc = register_endpoint (addr_, endpoint); -        sync.unlock ();          return rc;      } @@ -346,7 +333,6 @@ int xs::socket_base_t::bind (const char *addr_)          //  For convenience's sake, bind can be used interchageable with          //  connect for PGM and EPGM transports.          rc = connect (addr_); -        sync.unlock ();          return rc;      } @@ -354,7 +340,6 @@ int xs::socket_base_t::bind (const char *addr_)      //  point we'll choose one.      io_thread_t *io_thread = choose_io_thread (options.affinity);      if (!io_thread) { -        sync.unlock ();          errno = EMTHREAD;          return -1;      } @@ -366,11 +351,9 @@ int xs::socket_base_t::bind (const char *addr_)          int rc = listener->set_address (address.c_str ());          if (rc != 0) {              delete listener; -            sync.unlock ();              return -1;          }          launch_child (listener); -        sync.unlock ();          return 0;      } @@ -382,11 +365,9 @@ int xs::socket_base_t::bind (const char *addr_)          int rc = listener->set_address (address.c_str ());          if (rc != 0) {              delete listener; -            sync.unlock ();              return -1;          }          launch_child (listener); -        sync.unlock ();          return 0;      }  #endif @@ -397,10 +378,7 @@ int xs::socket_base_t::bind (const char *addr_)  int xs::socket_base_t::connect (const char *addr_)  { -    sync.lock (); -      if (unlikely (ctx_terminated)) { -        sync.unlock ();          errno = ETERM;          return -1;      } @@ -409,16 +387,12 @@ int xs::socket_base_t::connect (const char *addr_)      std::string protocol;      std::string address;      int rc = parse_uri (addr_, protocol, address); -    if (rc != 0) { -        sync.unlock (); +    if (rc != 0)          return -1; -    }      rc = check_protocol (protocol); -    if (rc != 0) { -        sync.unlock (); +    if (rc != 0)          return -1; -    }      if (protocol == "inproc") { @@ -428,10 +402,8 @@ int xs::socket_base_t::connect (const char *addr_)          //  Find the peer endpoint.          endpoint_t peer = find_endpoint (addr_); -        if (!peer.socket) { -            sync.unlock (); +        if (!peer.socket)              return -1; -        }          // The total HWM for an inproc connection should be the sum of          // the binder's HWM and the connector's HWM. @@ -473,14 +445,12 @@ int xs::socket_base_t::connect (const char *addr_)          //  increased here.          send_bind (peer.socket, pipes [1], false); -        sync.unlock ();          return 0;      }      //  Choose the I/O thread to run the session in.      io_thread_t *io_thread = choose_io_thread (options.affinity);      if (!io_thread) { -        sync.unlock ();          errno = EMTHREAD;          return -1;      } @@ -513,34 +483,27 @@ int xs::socket_base_t::connect (const char *addr_)      //  Activate the session. Make it a child of this socket.      launch_child (session); -    sync.unlock ();      return 0;  }  int xs::socket_base_t::send (msg_t *msg_, int flags_)  { -    sync.lock (); -      //  Check whether the library haven't been shut down yet.      if (unlikely (ctx_terminated)) { -        sync.unlock ();          errno = ETERM;          return -1;      }      //  Check whether message passed to the function is valid.      if (unlikely (!msg_ || !msg_->check ())) { -        sync.unlock ();          errno = EFAULT;          return -1;      }      //  Process pending commands, if any.      int rc = process_commands (0, true); -    if (unlikely (rc != 0)) { -        sync.unlock (); +    if (unlikely (rc != 0))          return -1; -    }      //  Clear any user-visible flags that are set on the message.      msg_->reset_flags (msg_t::more); @@ -551,21 +514,15 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)      //  Try to send the message.      rc = xsend (msg_, flags_); -    if (rc == 0) { -        sync.unlock (); +    if (rc == 0)          return 0; -    } -    if (unlikely (errno != EAGAIN)) { -        sync.unlock (); +    if (unlikely (errno != EAGAIN))          return -1; -    }      //  In case of non-blocking send we'll simply propagate      //  the error - including EAGAIN - up the stack. -    if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) { -        sync.unlock (); +    if (flags_ & XS_DONTWAIT || options.sndtimeo == 0)          return -1; -    }      //  Compute the time when the timeout should occur.      //  If the timeout is infite, don't care.  @@ -577,55 +534,43 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)      //  command, process it and try to send the message again.      //  If timeout is reached in the meantime, return EAGAIN.      while (true) { -        if (unlikely (process_commands (timeout, false) != 0)) { -            sync.unlock (); +        if (unlikely (process_commands (timeout, false) != 0))              return -1; -        }          rc = xsend (msg_, flags_);          if (rc == 0)              break; -        if (unlikely (errno != EAGAIN)) { -            sync.unlock (); +        if (unlikely (errno != EAGAIN))              return -1; -        }          if (timeout > 0) {              timeout = (int) (end - clock.now_ms ());              if (timeout <= 0) { -                sync.unlock ();                  errno = EAGAIN;                  return -1;              }          }      } -    sync.unlock ();      return 0;  }  int xs::socket_base_t::recv (msg_t *msg_, int flags_)  { -    sync.lock (); -      //  Check whether the library haven't been shut down yet.      if (unlikely (ctx_terminated)) { -        sync.unlock ();          errno = ETERM;          return -1;      }      //  Check whether message passed to the function is valid.      if (unlikely (!msg_ || !msg_->check ())) { -        sync.unlock ();          errno = EFAULT;          return -1;      }      //  Get the message.      int rc = xrecv (msg_, flags_); -    if (unlikely (rc != 0 && errno != EAGAIN)) { -        sync.unlock (); +    if (unlikely (rc != 0 && errno != EAGAIN))          return -1; -    }      //  Once every inbound_poll_rate messages check for signals and process      //  incoming commands. This happens only if we are not polling altogether @@ -644,7 +589,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)      //  If we have the message, return immediately.      if (rc == 0) {          extract_flags (msg_); -        sync.unlock ();          return 0;      } @@ -653,20 +597,15 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)      //  activate_reader command already waiting int a command pipe.      //  If it's not, return EAGAIN.      if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) { -        if (unlikely (process_commands (0, false) != 0)) { -            sync.unlock (); +        if (unlikely (process_commands (0, false) != 0))              return -1; -        }          ticks = 0;          rc = xrecv (msg_, flags_); -        if (rc < 0) { -            sync.unlock (); +        if (rc < 0)              return rc; -        }          extract_flags (msg_); -        sync.unlock ();          return 0;      } @@ -680,24 +619,19 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)      //  we are able to fetch a message.      bool block = (ticks != 0);      while (true) { -        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { -            sync.unlock (); +        if (unlikely (process_commands (block ? timeout : 0, false) != 0))              return -1; -        }          rc = xrecv (msg_, flags_);          if (rc == 0) {              ticks = 0;              break;          } -        if (unlikely (errno != EAGAIN)) { -            sync.unlock (); +        if (unlikely (errno != EAGAIN))              return -1; -        }          block = true;          if (timeout > 0) {              timeout = (int) (end - clock.now_ms ());              if (timeout <= 0) { -                sync.unlock ();                  errno = EAGAIN;                  return -1;              } @@ -705,7 +639,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)      }      extract_flags (msg_); -    sync.unlock ();      return 0;  } @@ -724,17 +657,13 @@ int xs::socket_base_t::close ()  bool xs::socket_base_t::has_in ()  { -    sync.lock ();      bool ret = xhas_in (); -    sync.unlock ();      return ret;  }  bool xs::socket_base_t::has_out ()  { -    sync.lock ();      bool ret = xhas_out (); -    sync.unlock ();      return ret;  } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 850586e..8b9a948 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -65,6 +65,10 @@ namespace xs          //  This function can be called from a different thread!          void stop (); +        //  Synchronise access of application threads to the socket. +        void lock (); +        void unlock (); +          //  Interface for communication with the API layer.          int setsockopt (int option_, const void *optval_, size_t optvallen_);          int getsockopt (int option_, void *optval_, size_t *optvallen_); @@ -143,6 +147,7 @@ namespace xs          //  Synchronisation of access to the socket. If Crossroads are running          //  in non-reentrant mode, it is a dummy mutex-like object. +        bool reentrant;          mutex_t sync;          //  If true, associated context was already terminated. @@ -202,40 +202,54 @@ int xs_close (void *s_)  int xs_setsockopt (void *s_, int option_, const void *optval_,      size_t optvallen_)  { -    if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { +    xs::socket_base_t *s = (xs::socket_base_t*) s_; +    if (!s || !s->check_tag ()) {          errno = ENOTSOCK;          return -1;      } -    return (((xs::socket_base_t*) s_)->setsockopt (option_, optval_, -        optvallen_)); +    s->lock (); +    int rc = s->setsockopt (option_, optval_, optvallen_); +    s->unlock (); +    return rc;  }  int xs_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)  { -    if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { +    xs::socket_base_t *s = (xs::socket_base_t*) s_; +    if (!s || !s->check_tag ()) {          errno = ENOTSOCK;          return -1;      } -    return (((xs::socket_base_t*) s_)->getsockopt (option_, optval_, -        optvallen_)); +    s->lock (); +    int rc = s->getsockopt (option_, optval_, optvallen_); +    s->unlock (); +    return rc;  }  int xs_bind (void *s_, const char *addr_)  { -    if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { +    xs::socket_base_t *s = (xs::socket_base_t*) s_; +    if (!s || !s->check_tag ()) {          errno = ENOTSOCK;          return -1;      } -    return (((xs::socket_base_t*) s_)->bind (addr_)); +    s->lock (); +    int rc = s->bind (addr_); +    s->unlock (); +    return rc;  }  int xs_connect (void *s_, const char *addr_)  { -    if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { +    xs::socket_base_t *s = (xs::socket_base_t*) s_; +    if (!s || !s->check_tag ()) {          errno = ENOTSOCK;          return -1;      } -    return (((xs::socket_base_t*) s_)->connect (addr_)); +    s->lock (); +    int rc = s->connect (addr_); +    s->unlock (); +    return rc;  }  int xs_send (void *s_, const void *buf_, size_t len_, int flags_) @@ -288,12 +302,15 @@ int xs_recv (void *s_, void *buf_, size_t len_, int flags_)  int xs_sendmsg (void *s_, xs_msg_t *msg_, int flags_)  { -    if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { +    xs::socket_base_t *s = (xs::socket_base_t*) s_; +    if (!s || !s->check_tag ()) {          errno = ENOTSOCK;          return -1;      }      int sz = (int) xs_msg_size (msg_); -    int rc = (((xs::socket_base_t*) s_)->send ((xs::msg_t*) msg_, flags_)); +    s->lock (); +    int rc = s->send ((xs::msg_t*) msg_, flags_); +    s->unlock ();      if (unlikely (rc < 0))          return -1;      return sz; @@ -301,11 +318,14 @@ int xs_sendmsg (void *s_, xs_msg_t *msg_, int flags_)  int xs_recvmsg (void *s_, xs_msg_t *msg_, int flags_)  { -    if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) { +    xs::socket_base_t *s = (xs::socket_base_t*) s_; +    if (!s || !s->check_tag ()) {          errno = ENOTSOCK;          return -1;      } -    int rc = (((xs::socket_base_t*) s_)->recv ((xs::msg_t*) msg_, flags_)); +    s->lock (); +    int rc = s->recv ((xs::msg_t*) msg_, flags_); +    s->unlock ();      if (unlikely (rc < 0))          return -1;      return (int) xs_msg_size (msg_); | 
