summaryrefslogtreecommitdiff
path: root/src/signaler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/signaler.cpp')
-rw-r--r--src/signaler.cpp86
1 files changed, 55 insertions, 31 deletions
diff --git a/src/signaler.cpp b/src/signaler.cpp
index d4a9214..787ce4a 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -49,10 +49,10 @@ zmq::signaler_t::signaler_t ()
struct sockaddr_in addr;
SOCKET listener;
int addrlen = sizeof (addr);
-
+
w = INVALID_SOCKET;
r = INVALID_SOCKET;
-
+
fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0));
wsa_assert (rcs != INVALID_SOCKET);
@@ -60,25 +60,34 @@ zmq::signaler_t::signaler_t ()
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
addr.sin_port = 0;
-
+
int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR);
rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
wsa_assert (rc != SOCKET_ERROR);
-
+
// Listen for incomming connections.
rc = listen (listener, 1);
wsa_assert (rc != SOCKET_ERROR);
-
+
// Create the socket.
w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
wsa_assert (w != INVALID_SOCKET);
-
+
+ // Increase signaler SNDBUF if requested in config.hpp.
+ if (signaler_sndbuf_size) {
+ int sndbuf = signaler_sndbuf_size;
+ socklen_t sndbuf_size = sizeof sndbuf;
+ rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf,
+ sndbuf_size);
+ errno_assert (rc == 0);
+ }
+
// Connect to the remote peer.
rc = connect (w, (sockaddr *) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR);
-
+
// Accept connection from w.
r = accept (listener, NULL, NULL);
wsa_assert (r != INVALID_SOCKET);
@@ -112,7 +121,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (rc == sizeof (command_t));
}
-bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{
if (block_) {
@@ -122,10 +131,12 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
wsa_assert (rc != SOCKET_ERROR);
}
- bool result;
+ int err;
+ int result;
int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
- result = false;
+ err = EAGAIN;
+ result = -1;
}
else {
wsa_assert (nbytes != -1);
@@ -133,7 +144,7 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
// Check whether we haven't got half of a signal.
zmq_assert (nbytes % sizeof (uint32_t) == 0);
- result = true;
+ result = 0;
}
if (block_) {
@@ -144,6 +155,8 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
wsa_assert (rc != SOCKET_ERROR);
}
+ if (result == -1)
+ errno = err;
return result;
}
@@ -166,6 +179,14 @@ zmq::signaler_t::signaler_t ()
flags = 0;
rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
+
+ // Increase signaler SNDBUF if requested in config.hpp.
+ if (signaler_sndbuf_size) {
+ int sndbuf = signaler_sndbuf_size;
+ socklen_t sndbuf_size = sizeof sndbuf;
+ rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size);
+ errno_assert (rc == 0);
+ }
}
zmq::signaler_t::~signaler_t ()
@@ -184,7 +205,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{
if (block_) {
@@ -196,13 +217,12 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
errno_assert (rc != -1);
}
- bool result;
- ssize_t nbytes;
- do {
- nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
- } while (nbytes == -1 && errno == EINTR);
- if (nbytes == -1 && errno == EAGAIN) {
- result = false;
+ int err;
+ int result;
+ ssize_t nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
+ if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) {
+ err = errno;
+ result = -1;
}
else {
zmq_assert (nbytes != -1);
@@ -210,7 +230,7 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
// Check whether we haven't got half of command.
zmq_assert (nbytes == sizeof (command_t));
- result = true;
+ result = 0;
}
if (block_) {
@@ -223,6 +243,8 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
errno_assert (rc != -1);
}
+ if (result == -1)
+ errno = err;
return result;
}
@@ -243,6 +265,14 @@ zmq::signaler_t::signaler_t ()
errno_assert (rc == 0);
w = sv [0];
r = sv [1];
+
+ // Increase signaler SNDBUF if requested in config.hpp.
+ if (signaler_sndbuf_size) {
+ int sndbuf = signaler_sndbuf_size;
+ socklen_t sndbuf_size = sizeof sndbuf;
+ rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size);
+ errno_assert (rc == 0);
+ }
}
zmq::signaler_t::~signaler_t ()
@@ -266,24 +296,18 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{
ssize_t nbytes;
- do {
- nbytes = ::recv (r, cmd_, sizeof (command_t),
- block_ ? 0 : MSG_DONTWAIT);
- } while (nbytes == -1 && errno == EINTR);
-
- // If there's no signal available return false.
- if (nbytes == -1 && errno == EAGAIN)
- return false;
-
+ nbytes = ::recv (r, cmd_, sizeof (command_t), block_ ? 0 : MSG_DONTWAIT);
+ if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
+ return -1;
errno_assert (nbytes != -1);
// Check whether we haven't got half of command.
zmq_assert (nbytes == sizeof (command_t));
- return true;
+ return 0;
}
#endif