From ae93ed318a450d6d763a5f629d478467f7362b07 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 29 Apr 2010 20:34:48 +0200 Subject: signaler rewritten in such a way that any number (>64) of threads can be used --- src/signaler.cpp | 200 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 119 insertions(+), 81 deletions(-) (limited to 'src/signaler.cpp') diff --git a/src/signaler.cpp b/src/signaler.cpp index 1d3b32f..2d89eca 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -32,9 +32,60 @@ #include #endif +const uint32_t zmq::signaler_t::no_signal = 0xffffffff; + +uint32_t zmq::signaler_t::poll () +{ + // Return next signal. + if (current != count) { + uint32_t result = buffer [current]; + current++; + return result; + } + + // If there is no signal buffered, poll for new signals. + xpoll (); + + // Return first signal. + zmq_assert (current != count); + uint32_t result = buffer [current]; + current++; + return result; +} + +uint32_t zmq::signaler_t::check () +{ + // Return next signal. + if (current != count) { + uint32_t result = buffer [current]; + current++; + return result; + } + + // If there is no signal buffered, check whether more signals + // can be obtained. + xcheck (); + + // Return first signal if any. + if (current != count) { + uint32_t result = buffer [current]; + current++; + return result; + } + + return no_signal; +} + +zmq::fd_t zmq::signaler_t::get_fd () +{ + return r; +} + #if defined ZMQ_HAVE_WINDOWS -zmq::signaler_t::signaler_t () +zmq::signaler_t::signaler_t () : + current (0), + count (0) { // Windows have no 'socketpair' function. CreatePipe is no good as pipe // handles cannot be polled on. Here we create the socketpair by hand. @@ -95,18 +146,16 @@ zmq::signaler_t::~signaler_t () wsa_assert (rc != SOCKET_ERROR); } -void zmq::signaler_t::signal (int signal_) +void zmq::signaler_t::signal (uint32_t signal_) { // TODO: Note that send is a blocking operation. // How should we behave if the signal cannot be written to the signaler? - - zmq_assert (signal_ >= 0 && signal_ < 64); - char c = (char) signal_; - int rc = send (w, &c, 1, 0); + int rc = send (w, &signal_, sizeof (signal_), 0); win_assert (rc != SOCKET_ERROR); + zmq_assert (rc == sizeof (signal_)); } -uint64_t zmq::signaler_t::poll () +void zmq::signaler_t::xpoll () { // Switch to blocking mode. unsigned long argp = 0; @@ -115,8 +164,8 @@ uint64_t zmq::signaler_t::poll () // Get the signals. Given that we are in the blocking mode now, // there should be at least a single signal returned. - uint64_t signals = check (); - zmq_assert (signals); + xcheck (); + zmq_assert (current != count); // Switch back to non-blocking mode. argp = 1; @@ -126,25 +175,24 @@ uint64_t zmq::signaler_t::poll () return signals; } -uint64_t zmq::signaler_t::check () +void zmq::signaler_t::xcheck () { - unsigned char buffer [32]; - int nbytes = recv (r, (char*) buffer, 32, 0); - if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) - return 0; - wsa_assert (nbytes != -1); + int nbytes = recv (r, (char*) buffer, sizeof (buffer), 0); - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); + // No signals are available. + if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { + current = 0; + count = 0; + return; } - return signals; -} -zmq::fd_t zmq::signaler_t::get_fd () -{ - return r; + wsa_assert (nbytes != -1); + + // Check whether we haven't got half of a signal. + zmq_assert (nbytes % sizeof (uint32_t) == 0); + + current = 0; + count = nbytes / sizeof (uint32_t); } #elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX @@ -152,7 +200,9 @@ zmq::fd_t zmq::signaler_t::get_fd () #include #include -zmq::signaler_t::signaler_t () +zmq::signaler_t::signaler_t () : + current (0), + count (0) { int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); @@ -174,15 +224,14 @@ zmq::signaler_t::~signaler_t () close (r); } -void zmq::signaler_t::signal (int signal_) +void zmq::signaler_t::signal (uint32_t signal_) { - zmq_assert (signal_ >= 0 && signal_ < 64); - unsigned char c = (unsigned char) signal_; - ssize_t nbytes = send (w, &c, 1, 0); - errno_assert (nbytes == 1); + ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0); + errno_assert (nbytes != -1); + zmq_assert (nbytes == sizeof (signal_); } -uint64_t zmq::signaler_t::poll () +void zmq::signaler_t::xpoll () { // Set the reader to blocking mode. int flags = fcntl (r, F_GETFL, 0); @@ -192,7 +241,8 @@ uint64_t zmq::signaler_t::poll () errno_assert (rc != -1); // Poll for events. - uint64_t signals = check (); + xcheck (); + zmq_assert (current != count); // Set the reader to non-blocking mode. flags = fcntl (r, F_GETFL, 0); @@ -200,29 +250,23 @@ uint64_t zmq::signaler_t::poll () flags = 0; rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); errno_assert (rc != -1); - - return signals; } -uint64_t zmq::signaler_t::check () +void zmq::signaler_t::xcheck () { - unsigned char buffer [64]; - ssize_t nbytes = recv (r, buffer, 64, 0); - if (nbytes == -1 && errno == EAGAIN) - return 0; + ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0); + if (nbytes == -1 && errno == EAGAIN) { + current = 0; + count = 0; + return; + } zmq_assert (nbytes != -1); - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos ++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); - } - return signals; -} + // Check whether we haven't got half of a signal. + zmq_assert (nbytes % sizeof (uint32_t) == 0); -zmq::fd_t zmq::signaler_t::get_fd () -{ - return r; + current = 0; + count = nbytes / sizeof (uint32_t); } #else @@ -230,7 +274,9 @@ zmq::fd_t zmq::signaler_t::get_fd () #include #include -zmq::signaler_t::signaler_t () +zmq::signaler_t::signaler_t () : + current (0), + count (0) { int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); @@ -245,50 +291,42 @@ zmq::signaler_t::~signaler_t () close (r); } -void zmq::signaler_t::signal (int signal_) +void zmq::signaler_t::signal (uint32_t signal_) { // TODO: Note that send is a blocking operation. // How should we behave if the signal cannot be written to the signaler? - - zmq_assert (signal_ >= 0 && signal_ < 64); - unsigned char c = (unsigned char) signal_; - ssize_t nbytes = send (w, &c, 1, 0); - errno_assert (nbytes == 1); + ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0); + errno_assert (nbytes != -1); + zmq_assert (nbytes == sizeof (signal_)); } -uint64_t zmq::signaler_t::poll () +void zmq::signaler_t::xpoll () { - unsigned char buffer [64]; - ssize_t nbytes = recv (r, buffer, 64, 0); - zmq_assert (nbytes != -1); + ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0); + errno_assert (nbytes != -1); + + // Check whether we haven't got half of a signal. + zmq_assert (nbytes % sizeof (uint32_t) == 0); - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos ++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); - } - return signals; + current = 0; + count = nbytes / sizeof (uint32_t); } -uint64_t zmq::signaler_t::check () +void zmq::signaler_t::xcheck () { - unsigned char buffer [64]; ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT); - if (nbytes == -1 && errno == EAGAIN) - return 0; - zmq_assert (nbytes != -1); - - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos ++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); + if (nbytes == -1 && errno == EAGAIN) { + current = 0; + count = 0; + return; } - return signals; -} + errno_assert (nbytes != -1); -zmq::fd_t zmq::signaler_t::get_fd () -{ - return r; + // Check whether we haven't got half of a signal. + zmq_assert (nbytes % sizeof (uint32_t) == 0); + + current = 0; + count = nbytes / sizeof (uint32_t); } #endif -- cgit v1.2.3