From 235ed3a3dcffb7c658cbc9253eae9de54db24533 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 4 May 2010 10:22:16 +0200 Subject: signaler transports commands per se rather than one-bit signals --- src/signaler.cpp | 224 +++++++++++++++++++++---------------------------------- 1 file changed, 85 insertions(+), 139 deletions(-) (limited to 'src/signaler.cpp') diff --git a/src/signaler.cpp b/src/signaler.cpp index e1fa6a3..0fa43f7 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -30,52 +30,9 @@ #else #include #include +#include #endif -const uint32_t zmq::signaler_t::no_signal = 0xffffffff; - -uint32_t zmq::signaler_t::poll () -{ - // Return next signal. - if (current != count) { - uint32_t result = buffer [current]; - current++; - return result; - } - - // If there is no signal buffered, poll for new signals. - xpoll (); - - // Return first signal. - zmq_assert (current != count); - uint32_t result = buffer [current]; - current++; - return result; -} - -uint32_t zmq::signaler_t::check () -{ - // Return next signal. - if (current != count) { - uint32_t result = buffer [current]; - current++; - return result; - } - - // If there is no signal buffered, check whether more signals - // can be obtained. - xcheck (); - - // Return first signal if any. - if (current != count) { - uint32_t result = buffer [current]; - current++; - return result; - } - - return no_signal; -} - zmq::fd_t zmq::signaler_t::get_fd () { return r; @@ -84,8 +41,6 @@ zmq::fd_t zmq::signaler_t::get_fd () #if defined ZMQ_HAVE_WINDOWS zmq::signaler_t::signaler_t () : - current (0), - count (0) { // Windows have no 'socketpair' function. CreatePipe is no good as pipe // handles cannot be polled on. Here we create the socketpair by hand. @@ -146,51 +101,49 @@ zmq::signaler_t::~signaler_t () wsa_assert (rc != SOCKET_ERROR); } -void zmq::signaler_t::signal (uint32_t signal_) +void zmq::signaler_t::send (const command_t &cmd_) { // TODO: Note that send is a blocking operation. // How should we behave if the signal cannot be written to the signaler? - int rc = send (w, (char*) &signal_, sizeof (signal_), 0); + // Even worse: What if half of a command is written? + int rc = send (w, (char*) &cmd_, sizeof (command_t), 0); win_assert (rc != SOCKET_ERROR); - zmq_assert (rc == sizeof (signal_)); + zmq_assert (rc == sizeof (command_t)); } -void zmq::signaler_t::xpoll () +bool zmq::signaler_t::recv (command_t *cmd_, bool block_) { - // Switch to blocking mode. - unsigned long argp = 0; - int rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); + if (block_) { - // Get the signals. Given that we are in the blocking mode now, - // there should be at least a single signal returned. - xcheck (); - zmq_assert (current != count); + // Switch to blocking mode. + unsigned long argp = 0; + int rc = ioctlsocket (r, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); + } - // Switch back to non-blocking mode. - argp = 1; - rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); -} + bool result; + int nbytes = recv (r, (char*) cmd_, sizeof (command_t), 0); + if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { + result = false; + } + else { + wsa_assert (nbytes != -1); -void zmq::signaler_t::xcheck () -{ - int nbytes = recv (r, (char*) buffer, sizeof (buffer), 0); + // Check whether we haven't got half of a signal. + zmq_assert (nbytes % sizeof (uint32_t) == 0); - // No signals are available. - if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { - current = 0; - count = 0; - return; + result = true; } - wsa_assert (nbytes != -1); + if (block_) { - // Check whether we haven't got half of a signal. - zmq_assert (nbytes % sizeof (uint32_t) == 0); + // Switch back to non-blocking mode. + unsigned long argp = 1; + int rc = ioctlsocket (r, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); + } - current = 0; - count = nbytes / sizeof (uint32_t); + return result; } #elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX @@ -198,9 +151,7 @@ void zmq::signaler_t::xcheck () #include #include -zmq::signaler_t::signaler_t () : - current (0), - count (0) +zmq::signaler_t::signaler_t () { int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); @@ -222,49 +173,50 @@ zmq::signaler_t::~signaler_t () close (r); } -void zmq::signaler_t::signal (uint32_t signal_) +void zmq::signaler_t::send (const command_t &cmd_) { - ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0); + ssize_t nbytes = send (w, &cmd_, sizeof (command_t), 0); errno_assert (nbytes != -1); - zmq_assert (nbytes == sizeof (signal_); + zmq_assert (nbytes == sizeof (command_t)); } -void zmq::signaler_t::xpoll () +bool zmq::signaler_t::recv (command_t &cmd_, bool block_) { - // Set the reader to blocking mode. - int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; - int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); - errno_assert (rc != -1); + if (block_) { + + // Set the reader to blocking mode. + int flags = fcntl (r, F_GETFL, 0); + if (flags == -1) + flags = 0; + int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); + errno_assert (rc != -1); + } - // Poll for events. - xcheck (); - zmq_assert (current != count); + bool result; + ssize_t nbytes = recv (r, buffer, sizeof (command_t), 0); + if (nbytes == -1 && errno == EAGAIN) { + result = false; + } + else { + zmq_assert (nbytes != -1); - // Set the reader to non-blocking mode. - flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; - rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); -} + // Check whether we haven't got half of command. + zmq_assert (nbytes == sizeof (command_t)); -void zmq::signaler_t::xcheck () -{ - ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0); - if (nbytes == -1 && errno == EAGAIN) { - current = 0; - count = 0; - return; + result = true; } - zmq_assert (nbytes != -1); - // Check whether we haven't got half of a signal. - zmq_assert (nbytes % sizeof (uint32_t) == 0); + if (block_) + + // Set the reader to non-blocking mode. + int flags = fcntl (r, F_GETFL, 0); + if (flags == -1) + flags = 0; + int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc != -1); + } - current = 0; - count = nbytes / sizeof (uint32_t); + return result; } #else @@ -272,10 +224,13 @@ void zmq::signaler_t::xcheck () #include #include -zmq::signaler_t::signaler_t () : - current (0), - count (0) +zmq::signaler_t::signaler_t () { + // Make sure that command can be written to the socket in atomic fashion. + // If this wasn't guaranteed, commands from different threads would be + // interleaved. + zmq_assert (sizeof (command_t) <= PIPE_BUF); + int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); errno_assert (rc == 0); @@ -289,42 +244,33 @@ zmq::signaler_t::~signaler_t () close (r); } -void zmq::signaler_t::signal (uint32_t signal_) +void zmq::signaler_t::send (const command_t &cmd_) { // TODO: Note that send is a blocking operation. - // How should we behave if the signal cannot be written to the signaler? - ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0); + // How should we behave if the command cannot be written to the signaler? + ssize_t nbytes = ::send (w, &cmd_, sizeof (command_t), 0); errno_assert (nbytes != -1); - zmq_assert (nbytes == sizeof (signal_)); + + // This should never happen as we've already checked that command size is + // less than PIPE_BUF. + zmq_assert (nbytes == sizeof (command_t)); } -void zmq::signaler_t::xpoll () +bool zmq::signaler_t::recv (command_t *cmd_, bool block_) { - ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0); - errno_assert (nbytes != -1); - - // Check whether we haven't got half of a signal. - zmq_assert (nbytes % sizeof (uint32_t) == 0); + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), + block_ ? 0 : MSG_DONTWAIT); - current = 0; - count = nbytes / sizeof (uint32_t); -} + // If there's no signal available return false. + if (nbytes == -1 && errno == EAGAIN) + return false; -void zmq::signaler_t::xcheck () -{ - ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT); - if (nbytes == -1 && errno == EAGAIN) { - current = 0; - count = 0; - return; - } errno_assert (nbytes != -1); - // Check whether we haven't got half of a signal. - zmq_assert (nbytes % sizeof (uint32_t) == 0); + // Check whether we haven't got half of command. + zmq_assert (nbytes == sizeof (command_t)); - current = 0; - count = nbytes / sizeof (uint32_t); + return true; } #endif -- cgit v1.2.3