summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mutex.hpp53
-rw-r--r--src/socket_base.cpp133
-rw-r--r--src/socket_base.hpp5
-rw-r--r--src/xs.cpp48
4 files changed, 89 insertions, 150 deletions
diff --git a/src/mutex.hpp b/src/mutex.hpp
index fc75bed..c0a2cc9 100644
--- a/src/mutex.hpp
+++ b/src/mutex.hpp
@@ -37,34 +37,28 @@ namespace xs
class mutex_t
{
public:
- inline mutex_t (bool fake_ = false) :
- fake (fake_)
+ inline mutex_t ()
{
- if (!fake)
- InitializeCriticalSection (&cs);
+ InitializeCriticalSection (&cs);
}
inline ~mutex_t ()
{
- if (!fake)
- DeleteCriticalSection (&cs);
+ DeleteCriticalSection (&cs);
}
inline void lock ()
{
- if (!fake)
- EnterCriticalSection (&cs);
+ EnterCriticalSection (&cs);
}
inline void unlock ()
{
- if (!fake)
- LeaveCriticalSection (&cs);
+ LeaveCriticalSection (&cs);
}
private:
- bool fake;
CRITICAL_SECTION cs;
// Disable copy construction and assignment.
@@ -84,46 +78,36 @@ namespace xs
class mutex_t
{
public:
- inline mutex_t (bool fake_ = false) :
- fake (fake_)
+ inline mutex_t ()
{
- if (!fake) {
- int rc = pthread_mutex_init (&mutex, NULL);
- if (rc)
- posix_assert (rc);
- }
+ int rc = pthread_mutex_init (&mutex, NULL);
+ if (rc)
+ posix_assert (rc);
}
inline ~mutex_t ()
{
- if (!fake) {
- int rc = pthread_mutex_destroy (&mutex);
- if (rc)
- posix_assert (rc);
- }
+ int rc = pthread_mutex_destroy (&mutex);
+ if (rc)
+ posix_assert (rc);
}
inline void lock ()
{
- if (!fake) {
- int rc = pthread_mutex_lock (&mutex);
- if (rc)
- posix_assert (rc);
- }
+ int rc = pthread_mutex_lock (&mutex);
+ if (rc)
+ posix_assert (rc);
}
inline void unlock ()
{
- if (!fake) {
- int rc = pthread_mutex_unlock (&mutex);
- if (rc)
- posix_assert (rc);
- }
+ int rc = pthread_mutex_unlock (&mutex);
+ if (rc)
+ posix_assert (rc);
}
private:
- bool fake;
pthread_mutex_t mutex;
// Disable copy construction and assignment.
@@ -136,3 +120,4 @@ namespace xs
#endif
#endif
+
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index fb387db..97ac4b4 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -117,7 +117,7 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,
xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
own_t (parent_, tid_),
tag (0xbaddecaf),
- sync (!parent_->is_reentrant ()),
+ reentrant (parent_->is_reentrant ()),
ctx_terminated (false),
destroyed (false),
last_tsc (0),
@@ -146,6 +146,18 @@ void xs::socket_base_t::stop ()
send_stop ();
}
+void xs::socket_base_t::lock ()
+{
+ if (reentrant)
+ sync.lock ();
+}
+
+void xs::socket_base_t::unlock ()
+{
+ if (reentrant)
+ sync.unlock ();
+}
+
int xs::socket_base_t::parse_uri (const char *uri_,
std::string &protocol_, std::string &address_)
{
@@ -227,74 +239,58 @@ void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_)
int xs::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
- sync.lock ();
-
if (unlikely (ctx_terminated)) {
- sync.unlock ();
errno = ETERM;
return -1;
}
// First, check whether specific socket type overloads the option.
int rc = xsetsockopt (option_, optval_, optvallen_);
- if (rc == 0 || errno != EINVAL) {
- sync.unlock ();
+ if (rc == 0 || errno != EINVAL)
return rc;
- }
// If the socket type doesn't support the option, pass it to
// the generic option parser.
rc = options.setsockopt (option_, optval_, optvallen_);
- sync.unlock ();
return rc;
}
int xs::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_)
{
- sync.lock ();
-
if (unlikely (ctx_terminated)) {
- sync.unlock ();
errno = ETERM;
return -1;
}
if (option_ == XS_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
- sync.unlock ();
errno = EINVAL;
return -1;
}
*((int*) optval_) = rcvmore ? 1 : 0;
*optvallen_ = sizeof (int);
- sync.unlock ();
return 0;
}
if (option_ == XS_FD) {
if (*optvallen_ < sizeof (fd_t)) {
errno = EINVAL;
- sync.unlock ();
return -1;
}
*((fd_t*) optval_) = mailbox.get_fd ();
*optvallen_ = sizeof (fd_t);
- sync.unlock ();
return 0;
}
if (option_ == XS_EVENTS) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
- sync.unlock ();
return -1;
}
int rc = process_commands (0, false);
- if (rc != 0 && (errno == EINTR || errno == ETERM)) {
- sync.unlock ();
+ if (rc != 0 && (errno == EINTR || errno == ETERM))
return -1;
- }
errno_assert (rc == 0);
*((int*) optval_) = 0;
if (has_out ())
@@ -302,7 +298,6 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,
if (has_in ())
*((int*) optval_) |= XS_POLLIN;
*optvallen_ = sizeof (int);
- sync.unlock ();
return 0;
}
@@ -311,10 +306,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,
int xs::socket_base_t::bind (const char *addr_)
{
- sync.lock ();
-
if (unlikely (ctx_terminated)) {
- sync.unlock ();
errno = ETERM;
return -1;
}
@@ -323,21 +315,16 @@ int xs::socket_base_t::bind (const char *addr_)
std::string protocol;
std::string address;
int rc = parse_uri (addr_, protocol, address);
- if (rc != 0) {
- sync.unlock ();
+ if (rc != 0)
return -1;
- }
rc = check_protocol (protocol);
- if (rc != 0) {
- sync.unlock ();
+ if (rc != 0)
return -1;
- }
if (protocol == "inproc") {
endpoint_t endpoint = {this, options};
rc = register_endpoint (addr_, endpoint);
- sync.unlock ();
return rc;
}
@@ -346,7 +333,6 @@ int xs::socket_base_t::bind (const char *addr_)
// For convenience's sake, bind can be used interchageable with
// connect for PGM and EPGM transports.
rc = connect (addr_);
- sync.unlock ();
return rc;
}
@@ -354,7 +340,6 @@ int xs::socket_base_t::bind (const char *addr_)
// point we'll choose one.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
- sync.unlock ();
errno = EMTHREAD;
return -1;
}
@@ -366,11 +351,9 @@ int xs::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
- sync.unlock ();
return -1;
}
launch_child (listener);
- sync.unlock ();
return 0;
}
@@ -382,11 +365,9 @@ int xs::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
- sync.unlock ();
return -1;
}
launch_child (listener);
- sync.unlock ();
return 0;
}
#endif
@@ -397,10 +378,7 @@ int xs::socket_base_t::bind (const char *addr_)
int xs::socket_base_t::connect (const char *addr_)
{
- sync.lock ();
-
if (unlikely (ctx_terminated)) {
- sync.unlock ();
errno = ETERM;
return -1;
}
@@ -409,16 +387,12 @@ int xs::socket_base_t::connect (const char *addr_)
std::string protocol;
std::string address;
int rc = parse_uri (addr_, protocol, address);
- if (rc != 0) {
- sync.unlock ();
+ if (rc != 0)
return -1;
- }
rc = check_protocol (protocol);
- if (rc != 0) {
- sync.unlock ();
+ if (rc != 0)
return -1;
- }
if (protocol == "inproc") {
@@ -428,10 +402,8 @@ int xs::socket_base_t::connect (const char *addr_)
// Find the peer endpoint.
endpoint_t peer = find_endpoint (addr_);
- if (!peer.socket) {
- sync.unlock ();
+ if (!peer.socket)
return -1;
- }
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
@@ -473,14 +445,12 @@ int xs::socket_base_t::connect (const char *addr_)
// increased here.
send_bind (peer.socket, pipes [1], false);
- sync.unlock ();
return 0;
}
// Choose the I/O thread to run the session in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
- sync.unlock ();
errno = EMTHREAD;
return -1;
}
@@ -513,34 +483,27 @@ int xs::socket_base_t::connect (const char *addr_)
// Activate the session. Make it a child of this socket.
launch_child (session);
- sync.unlock ();
return 0;
}
int xs::socket_base_t::send (msg_t *msg_, int flags_)
{
- sync.lock ();
-
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
- sync.unlock ();
errno = ETERM;
return -1;
}
// Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) {
- sync.unlock ();
errno = EFAULT;
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, true);
- if (unlikely (rc != 0)) {
- sync.unlock ();
+ if (unlikely (rc != 0))
return -1;
- }
// Clear any user-visible flags that are set on the message.
msg_->reset_flags (msg_t::more);
@@ -551,21 +514,15 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)
// Try to send the message.
rc = xsend (msg_, flags_);
- if (rc == 0) {
- sync.unlock ();
+ if (rc == 0)
return 0;
- }
- if (unlikely (errno != EAGAIN)) {
- sync.unlock ();
+ if (unlikely (errno != EAGAIN))
return -1;
- }
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - up the stack.
- if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) {
- sync.unlock ();
+ if (flags_ & XS_DONTWAIT || options.sndtimeo == 0)
return -1;
- }
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
@@ -577,55 +534,43 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)
// command, process it and try to send the message again.
// If timeout is reached in the meantime, return EAGAIN.
while (true) {
- if (unlikely (process_commands (timeout, false) != 0)) {
- sync.unlock ();
+ if (unlikely (process_commands (timeout, false) != 0))
return -1;
- }
rc = xsend (msg_, flags_);
if (rc == 0)
break;
- if (unlikely (errno != EAGAIN)) {
- sync.unlock ();
+ if (unlikely (errno != EAGAIN))
return -1;
- }
if (timeout > 0) {
timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) {
- sync.unlock ();
errno = EAGAIN;
return -1;
}
}
}
- sync.unlock ();
return 0;
}
int xs::socket_base_t::recv (msg_t *msg_, int flags_)
{
- sync.lock ();
-
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
- sync.unlock ();
errno = ETERM;
return -1;
}
// Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) {
- sync.unlock ();
errno = EFAULT;
return -1;
}
// Get the message.
int rc = xrecv (msg_, flags_);
- if (unlikely (rc != 0 && errno != EAGAIN)) {
- sync.unlock ();
+ if (unlikely (rc != 0 && errno != EAGAIN))
return -1;
- }
// Once every inbound_poll_rate messages check for signals and process
// incoming commands. This happens only if we are not polling altogether
@@ -644,7 +589,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately.
if (rc == 0) {
extract_flags (msg_);
- sync.unlock ();
return 0;
}
@@ -653,20 +597,15 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN.
if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) {
- if (unlikely (process_commands (0, false) != 0)) {
- sync.unlock ();
+ if (unlikely (process_commands (0, false) != 0))
return -1;
- }
ticks = 0;
rc = xrecv (msg_, flags_);
- if (rc < 0) {
- sync.unlock ();
+ if (rc < 0)
return rc;
- }
extract_flags (msg_);
- sync.unlock ();
return 0;
}
@@ -680,24 +619,19 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
// we are able to fetch a message.
bool block = (ticks != 0);
while (true) {
- if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
- sync.unlock ();
+ if (unlikely (process_commands (block ? timeout : 0, false) != 0))
return -1;
- }
rc = xrecv (msg_, flags_);
if (rc == 0) {
ticks = 0;
break;
}
- if (unlikely (errno != EAGAIN)) {
- sync.unlock ();
+ if (unlikely (errno != EAGAIN))
return -1;
- }
block = true;
if (timeout > 0) {
timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) {
- sync.unlock ();
errno = EAGAIN;
return -1;
}
@@ -705,7 +639,6 @@ int xs::socket_base_t::recv (msg_t *msg_, int flags_)
}
extract_flags (msg_);
- sync.unlock ();
return 0;
}
@@ -724,17 +657,13 @@ int xs::socket_base_t::close ()
bool xs::socket_base_t::has_in ()
{
- sync.lock ();
bool ret = xhas_in ();
- sync.unlock ();
return ret;
}
bool xs::socket_base_t::has_out ()
{
- sync.lock ();
bool ret = xhas_out ();
- sync.unlock ();
return ret;
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 850586e..8b9a948 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -65,6 +65,10 @@ namespace xs
// This function can be called from a different thread!
void stop ();
+ // Synchronise access of application threads to the socket.
+ void lock ();
+ void unlock ();
+
// Interface for communication with the API layer.
int setsockopt (int option_, const void *optval_, size_t optvallen_);
int getsockopt (int option_, void *optval_, size_t *optvallen_);
@@ -143,6 +147,7 @@ namespace xs
// Synchronisation of access to the socket. If Crossroads are running
// in non-reentrant mode, it is a dummy mutex-like object.
+ bool reentrant;
mutex_t sync;
// If true, associated context was already terminated.
diff --git a/src/xs.cpp b/src/xs.cpp
index ad5bd97..7160da9 100644
--- a/src/xs.cpp
+++ b/src/xs.cpp
@@ -202,40 +202,54 @@ int xs_close (void *s_)
int xs_setsockopt (void *s_, int option_, const void *optval_,
size_t optvallen_)
{
- if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) {
+ xs::socket_base_t *s = (xs::socket_base_t*) s_;
+ if (!s || !s->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
- return (((xs::socket_base_t*) s_)->setsockopt (option_, optval_,
- optvallen_));
+ s->lock ();
+ int rc = s->setsockopt (option_, optval_, optvallen_);
+ s->unlock ();
+ return rc;
}
int xs_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
- if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) {
+ xs::socket_base_t *s = (xs::socket_base_t*) s_;
+ if (!s || !s->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
- return (((xs::socket_base_t*) s_)->getsockopt (option_, optval_,
- optvallen_));
+ s->lock ();
+ int rc = s->getsockopt (option_, optval_, optvallen_);
+ s->unlock ();
+ return rc;
}
int xs_bind (void *s_, const char *addr_)
{
- if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) {
+ xs::socket_base_t *s = (xs::socket_base_t*) s_;
+ if (!s || !s->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
- return (((xs::socket_base_t*) s_)->bind (addr_));
+ s->lock ();
+ int rc = s->bind (addr_);
+ s->unlock ();
+ return rc;
}
int xs_connect (void *s_, const char *addr_)
{
- if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) {
+ xs::socket_base_t *s = (xs::socket_base_t*) s_;
+ if (!s || !s->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
- return (((xs::socket_base_t*) s_)->connect (addr_));
+ s->lock ();
+ int rc = s->connect (addr_);
+ s->unlock ();
+ return rc;
}
int xs_send (void *s_, const void *buf_, size_t len_, int flags_)
@@ -288,12 +302,15 @@ int xs_recv (void *s_, void *buf_, size_t len_, int flags_)
int xs_sendmsg (void *s_, xs_msg_t *msg_, int flags_)
{
- if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) {
+ xs::socket_base_t *s = (xs::socket_base_t*) s_;
+ if (!s || !s->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
int sz = (int) xs_msg_size (msg_);
- int rc = (((xs::socket_base_t*) s_)->send ((xs::msg_t*) msg_, flags_));
+ s->lock ();
+ int rc = s->send ((xs::msg_t*) msg_, flags_);
+ s->unlock ();
if (unlikely (rc < 0))
return -1;
return sz;
@@ -301,11 +318,14 @@ int xs_sendmsg (void *s_, xs_msg_t *msg_, int flags_)
int xs_recvmsg (void *s_, xs_msg_t *msg_, int flags_)
{
- if (!s_ || !((xs::socket_base_t*) s_)->check_tag ()) {
+ xs::socket_base_t *s = (xs::socket_base_t*) s_;
+ if (!s || !s->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
- int rc = (((xs::socket_base_t*) s_)->recv ((xs::msg_t*) msg_, flags_));
+ s->lock ();
+ int rc = s->recv ((xs::msg_t*) msg_, flags_);
+ s->unlock ();
if (unlikely (rc < 0))
return -1;
return (int) xs_msg_size (msg_);