From 7668b246fc3cf4a2a3b3ee9b1283ad8a4b12ac4f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 20 Sep 2009 12:03:14 +0200 Subject: ZMQ_POLL option forces fd_signaler to be used in app_thread --- src/fd_signaler.cpp | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) (limited to 'src/fd_signaler.cpp') diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp index 52c3129..8c71356 100644 --- a/src/fd_signaler.cpp +++ b/src/fd_signaler.cpp @@ -65,12 +65,18 @@ void zmq::fd_signaler_t::signal (int signal_) errno_assert (sz == sizeof (uint64_t)); } +uint64_t zmq::fd_signaler_t::poll () +{ + // TODO: Can we do a blocking read on non-blocking eventfd? + // It's not needed as for now, so let it stay unimplemented. + zmq_assert (false); +} + uint64_t zmq::fd_signaler_t::check () { uint64_t val; ssize_t sz = read (fd, &val, sizeof (uint64_t)); - if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || - errno == EINTR)) + if (sz == -1 && (errno == EAGAIN || errno == EINTR)) return 0; errno_assert (sz != -1); return val; @@ -120,7 +126,7 @@ zmq::fd_signaler_t::fd_signaler_t () rc = connect (w, (sockaddr *) &addr, sizeof (addr)); wsa_assert (rc != SOCKET_ERROR); - // Accept connection from w + // Accept connection from w. r = accept (listener, NULL, NULL); wsa_assert (r != INVALID_SOCKET); @@ -139,6 +145,9 @@ zmq::fd_signaler_t::~fd_signaler_t () void zmq::fd_signaler_t::signal (int signal_) { + // TODO: Note that send is a blocking operation. + // How should we behave if the signal cannot be written to the signaler? + zmq_assert (signal_ >= 0 && signal_ < 64); char c = (char) signal_; int rc = send (w, &c, 1, 0); @@ -154,7 +163,7 @@ uint64_t zmq::fd_signaler_t::poll () // If there are no signals, wait until at least one signal arrives. unsigned char sig; - int nbytes = recv (r, (char*) &sig, 1, MSG_WAITALL); + int nbytes = recv (r, (char*) &sig, 1, 0); win_assert (nbytes != -1); return uint64_t (1) << sig; } @@ -162,7 +171,9 @@ uint64_t zmq::fd_signaler_t::poll () uint64_t zmq::fd_signaler_t::check () { unsigned char buffer [32]; - int nbytes = recv (r, (char*) buffer, 32, 0); + int nbytes = recv (r, (char*) buffer, 32, MSG_DONTWAIT); + if (nbytes == -1 && errno == EAGAIN) + return 0; win_assert (nbytes != -1); uint64_t signals = 0; @@ -190,13 +201,6 @@ zmq::fd_signaler_t::fd_signaler_t () errno_assert (rc == 0); w = sv [0]; r = sv [1]; - - // Set to non-blocking mode. - int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; - rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); } zmq::fd_signaler_t::~fd_signaler_t () @@ -207,6 +211,9 @@ zmq::fd_signaler_t::~fd_signaler_t () void zmq::fd_signaler_t::signal (int signal_) { + // TODO: Note that send is a blocking operation. + // How should we behave if the signal cannot be written to the signaler? + zmq_assert (signal_ >= 0 && signal_ < 64); unsigned char c = (unsigned char) signal_; ssize_t nbytes = send (w, &c, 1, 0); @@ -222,7 +229,7 @@ uint64_t zmq::fd_signaler_t::poll () // If there are no signals, wait until at least one signal arrives. unsigned char sig; - ssize_t nbytes = recv (r, &sig, 1, MSG_WAITALL); + ssize_t nbytes = recv (r, &sig, 1, 0); errno_assert (nbytes != -1); return uint64_t (1) << sig; } @@ -230,8 +237,10 @@ uint64_t zmq::fd_signaler_t::poll () uint64_t zmq::fd_signaler_t::check () { unsigned char buffer [32]; - ssize_t nbytes = recv (r, buffer, 32, 0); - errno_assert (nbytes != -1); + ssize_t nbytes = recv (r, buffer, 32, MSG_DONTWAIT); + if (nbytes == -1 && errno == EAGAIN) + return 0; + zmq_assert (nbytes != -1); uint64_t signals = 0; for (int pos = 0; pos != nbytes; pos ++) { -- cgit v1.2.3