diff options
Diffstat (limited to 'src/signaler.cpp')
-rw-r--r-- | src/signaler.cpp | 86 |
1 files changed, 55 insertions, 31 deletions
diff --git a/src/signaler.cpp b/src/signaler.cpp index d4a9214..787ce4a 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -49,10 +49,10 @@ zmq::signaler_t::signaler_t () struct sockaddr_in addr; SOCKET listener; int addrlen = sizeof (addr); - + w = INVALID_SOCKET; r = INVALID_SOCKET; - + fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0)); wsa_assert (rcs != INVALID_SOCKET); @@ -60,25 +60,34 @@ zmq::signaler_t::signaler_t () addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); addr.sin_port = 0; - + int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); wsa_assert (rc != SOCKET_ERROR); rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); wsa_assert (rc != SOCKET_ERROR); - + // Listen for incomming connections. rc = listen (listener, 1); wsa_assert (rc != SOCKET_ERROR); - + // Create the socket. w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); wsa_assert (w != INVALID_SOCKET); - + + // Increase signaler SNDBUF if requested in config.hpp. + if (signaler_sndbuf_size) { + int sndbuf = signaler_sndbuf_size; + socklen_t sndbuf_size = sizeof sndbuf; + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf, + sndbuf_size); + errno_assert (rc == 0); + } + // Connect to the remote peer. rc = connect (w, (sockaddr *) &addr, sizeof (addr)); wsa_assert (rc != SOCKET_ERROR); - + // Accept connection from w. r = accept (listener, NULL, NULL); wsa_assert (r != INVALID_SOCKET); @@ -112,7 +121,7 @@ void zmq::signaler_t::send (const command_t &cmd_) zmq_assert (rc == sizeof (command_t)); } -bool zmq::signaler_t::recv (command_t *cmd_, bool block_) +int zmq::signaler_t::recv (command_t *cmd_, bool block_) { if (block_) { @@ -122,10 +131,12 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) wsa_assert (rc != SOCKET_ERROR); } - bool result; + int err; + int result; int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { - result = false; + err = EAGAIN; + result = -1; } else { wsa_assert (nbytes != -1); @@ -133,7 +144,7 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) // Check whether we haven't got half of a signal. zmq_assert (nbytes % sizeof (uint32_t) == 0); - result = true; + result = 0; } if (block_) { @@ -144,6 +155,8 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) wsa_assert (rc != SOCKET_ERROR); } + if (result == -1) + errno = err; return result; } @@ -166,6 +179,14 @@ zmq::signaler_t::signaler_t () flags = 0; rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); errno_assert (rc != -1); + + // Increase signaler SNDBUF if requested in config.hpp. + if (signaler_sndbuf_size) { + int sndbuf = signaler_sndbuf_size; + socklen_t sndbuf_size = sizeof sndbuf; + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size); + errno_assert (rc == 0); + } } zmq::signaler_t::~signaler_t () @@ -184,7 +205,7 @@ void zmq::signaler_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -bool zmq::signaler_t::recv (command_t *cmd_, bool block_) +int zmq::signaler_t::recv (command_t *cmd_, bool block_) { if (block_) { @@ -196,13 +217,12 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) errno_assert (rc != -1); } - bool result; - ssize_t nbytes; - do { - nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); - } while (nbytes == -1 && errno == EINTR); - if (nbytes == -1 && errno == EAGAIN) { - result = false; + int err; + int result; + ssize_t nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); + if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) { + err = errno; + result = -1; } else { zmq_assert (nbytes != -1); @@ -210,7 +230,7 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) // Check whether we haven't got half of command. zmq_assert (nbytes == sizeof (command_t)); - result = true; + result = 0; } if (block_) { @@ -223,6 +243,8 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_) errno_assert (rc != -1); } + if (result == -1) + errno = err; return result; } @@ -243,6 +265,14 @@ zmq::signaler_t::signaler_t () errno_assert (rc == 0); w = sv [0]; r = sv [1]; + + // Increase signaler SNDBUF if requested in config.hpp. + if (signaler_sndbuf_size) { + int sndbuf = signaler_sndbuf_size; + socklen_t sndbuf_size = sizeof sndbuf; + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size); + errno_assert (rc == 0); + } } zmq::signaler_t::~signaler_t () @@ -266,24 +296,18 @@ void zmq::signaler_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -bool zmq::signaler_t::recv (command_t *cmd_, bool block_) +int zmq::signaler_t::recv (command_t *cmd_, bool block_) { ssize_t nbytes; - do { - nbytes = ::recv (r, cmd_, sizeof (command_t), - block_ ? 0 : MSG_DONTWAIT); - } while (nbytes == -1 && errno == EINTR); - - // If there's no signal available return false. - if (nbytes == -1 && errno == EAGAIN) - return false; - + nbytes = ::recv (r, cmd_, sizeof (command_t), block_ ? 0 : MSG_DONTWAIT); + if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) + return -1; errno_assert (nbytes != -1); // Check whether we haven't got half of command. zmq_assert (nbytes == sizeof (command_t)); - return true; + return 0; } #endif |