summaryrefslogtreecommitdiff
path: root/src/fd_signaler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/fd_signaler.cpp')
-rw-r--r--src/fd_signaler.cpp39
1 files changed, 24 insertions, 15 deletions
diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp
index 52c3129..8c71356 100644
--- a/src/fd_signaler.cpp
+++ b/src/fd_signaler.cpp
@@ -65,12 +65,18 @@ void zmq::fd_signaler_t::signal (int signal_)
errno_assert (sz == sizeof (uint64_t));
}
+uint64_t zmq::fd_signaler_t::poll ()
+{
+ // TODO: Can we do a blocking read on non-blocking eventfd?
+ // It's not needed as for now, so let it stay unimplemented.
+ zmq_assert (false);
+}
+
uint64_t zmq::fd_signaler_t::check ()
{
uint64_t val;
ssize_t sz = read (fd, &val, sizeof (uint64_t));
- if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
- errno == EINTR))
+ if (sz == -1 && (errno == EAGAIN || errno == EINTR))
return 0;
errno_assert (sz != -1);
return val;
@@ -120,7 +126,7 @@ zmq::fd_signaler_t::fd_signaler_t ()
rc = connect (w, (sockaddr *) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR);
- // Accept connection from w
+ // Accept connection from w.
r = accept (listener, NULL, NULL);
wsa_assert (r != INVALID_SOCKET);
@@ -139,6 +145,9 @@ zmq::fd_signaler_t::~fd_signaler_t ()
void zmq::fd_signaler_t::signal (int signal_)
{
+ // TODO: Note that send is a blocking operation.
+ // How should we behave if the signal cannot be written to the signaler?
+
zmq_assert (signal_ >= 0 && signal_ < 64);
char c = (char) signal_;
int rc = send (w, &c, 1, 0);
@@ -154,7 +163,7 @@ uint64_t zmq::fd_signaler_t::poll ()
// If there are no signals, wait until at least one signal arrives.
unsigned char sig;
- int nbytes = recv (r, (char*) &sig, 1, MSG_WAITALL);
+ int nbytes = recv (r, (char*) &sig, 1, 0);
win_assert (nbytes != -1);
return uint64_t (1) << sig;
}
@@ -162,7 +171,9 @@ uint64_t zmq::fd_signaler_t::poll ()
uint64_t zmq::fd_signaler_t::check ()
{
unsigned char buffer [32];
- int nbytes = recv (r, (char*) buffer, 32, 0);
+ int nbytes = recv (r, (char*) buffer, 32, MSG_DONTWAIT);
+ if (nbytes == -1 && errno == EAGAIN)
+ return 0;
win_assert (nbytes != -1);
uint64_t signals = 0;
@@ -190,13 +201,6 @@ zmq::fd_signaler_t::fd_signaler_t ()
errno_assert (rc == 0);
w = sv [0];
r = sv [1];
-
- // Set to non-blocking mode.
- int flags = fcntl (r, F_GETFL, 0);
- if (flags == -1)
- flags = 0;
- rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc != -1);
}
zmq::fd_signaler_t::~fd_signaler_t ()
@@ -207,6 +211,9 @@ zmq::fd_signaler_t::~fd_signaler_t ()
void zmq::fd_signaler_t::signal (int signal_)
{
+ // TODO: Note that send is a blocking operation.
+ // How should we behave if the signal cannot be written to the signaler?
+
zmq_assert (signal_ >= 0 && signal_ < 64);
unsigned char c = (unsigned char) signal_;
ssize_t nbytes = send (w, &c, 1, 0);
@@ -222,7 +229,7 @@ uint64_t zmq::fd_signaler_t::poll ()
// If there are no signals, wait until at least one signal arrives.
unsigned char sig;
- ssize_t nbytes = recv (r, &sig, 1, MSG_WAITALL);
+ ssize_t nbytes = recv (r, &sig, 1, 0);
errno_assert (nbytes != -1);
return uint64_t (1) << sig;
}
@@ -230,8 +237,10 @@ uint64_t zmq::fd_signaler_t::poll ()
uint64_t zmq::fd_signaler_t::check ()
{
unsigned char buffer [32];
- ssize_t nbytes = recv (r, buffer, 32, 0);
- errno_assert (nbytes != -1);
+ ssize_t nbytes = recv (r, buffer, 32, MSG_DONTWAIT);
+ if (nbytes == -1 && errno == EAGAIN)
+ return 0;
+ zmq_assert (nbytes != -1);
uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos ++) {