summaryrefslogtreecommitdiff
path: root/src/signaler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/signaler.cpp')
-rw-r--r--src/signaler.cpp200
1 files changed, 119 insertions, 81 deletions
diff --git a/src/signaler.cpp b/src/signaler.cpp
index 1d3b32f..2d89eca 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -32,9 +32,60 @@
#include <fcntl.h>
#endif
+const uint32_t zmq::signaler_t::no_signal = 0xffffffff;
+
+uint32_t zmq::signaler_t::poll ()
+{
+ // Return next signal.
+ if (current != count) {
+ uint32_t result = buffer [current];
+ current++;
+ return result;
+ }
+
+ // If there is no signal buffered, poll for new signals.
+ xpoll ();
+
+ // Return first signal.
+ zmq_assert (current != count);
+ uint32_t result = buffer [current];
+ current++;
+ return result;
+}
+
+uint32_t zmq::signaler_t::check ()
+{
+ // Return next signal.
+ if (current != count) {
+ uint32_t result = buffer [current];
+ current++;
+ return result;
+ }
+
+ // If there is no signal buffered, check whether more signals
+ // can be obtained.
+ xcheck ();
+
+ // Return first signal if any.
+ if (current != count) {
+ uint32_t result = buffer [current];
+ current++;
+ return result;
+ }
+
+ return no_signal;
+}
+
+zmq::fd_t zmq::signaler_t::get_fd ()
+{
+ return r;
+}
+
#if defined ZMQ_HAVE_WINDOWS
-zmq::signaler_t::signaler_t ()
+zmq::signaler_t::signaler_t () :
+ current (0),
+ count (0)
{
// Windows have no 'socketpair' function. CreatePipe is no good as pipe
// handles cannot be polled on. Here we create the socketpair by hand.
@@ -95,18 +146,16 @@ zmq::signaler_t::~signaler_t ()
wsa_assert (rc != SOCKET_ERROR);
}
-void zmq::signaler_t::signal (int signal_)
+void zmq::signaler_t::signal (uint32_t signal_)
{
// TODO: Note that send is a blocking operation.
// How should we behave if the signal cannot be written to the signaler?
-
- zmq_assert (signal_ >= 0 && signal_ < 64);
- char c = (char) signal_;
- int rc = send (w, &c, 1, 0);
+ int rc = send (w, &signal_, sizeof (signal_), 0);
win_assert (rc != SOCKET_ERROR);
+ zmq_assert (rc == sizeof (signal_));
}
-uint64_t zmq::signaler_t::poll ()
+void zmq::signaler_t::xpoll ()
{
// Switch to blocking mode.
unsigned long argp = 0;
@@ -115,8 +164,8 @@ uint64_t zmq::signaler_t::poll ()
// Get the signals. Given that we are in the blocking mode now,
// there should be at least a single signal returned.
- uint64_t signals = check ();
- zmq_assert (signals);
+ xcheck ();
+ zmq_assert (current != count);
// Switch back to non-blocking mode.
argp = 1;
@@ -126,25 +175,24 @@ uint64_t zmq::signaler_t::poll ()
return signals;
}
-uint64_t zmq::signaler_t::check ()
+void zmq::signaler_t::xcheck ()
{
- unsigned char buffer [32];
- int nbytes = recv (r, (char*) buffer, 32, 0);
- if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
- return 0;
- wsa_assert (nbytes != -1);
+ int nbytes = recv (r, (char*) buffer, sizeof (buffer), 0);
- uint64_t signals = 0;
- for (int pos = 0; pos != nbytes; pos++) {
- zmq_assert (buffer [pos] < 64);
- signals |= (uint64_t (1) << (buffer [pos]));
+ // No signals are available.
+ if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
+ current = 0;
+ count = 0;
+ return;
}
- return signals;
-}
-zmq::fd_t zmq::signaler_t::get_fd ()
-{
- return r;
+ wsa_assert (nbytes != -1);
+
+ // Check whether we haven't got half of a signal.
+ zmq_assert (nbytes % sizeof (uint32_t) == 0);
+
+ current = 0;
+ count = nbytes / sizeof (uint32_t);
}
#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX
@@ -152,7 +200,9 @@ zmq::fd_t zmq::signaler_t::get_fd ()
#include <sys/types.h>
#include <sys/socket.h>
-zmq::signaler_t::signaler_t ()
+zmq::signaler_t::signaler_t () :
+ current (0),
+ count (0)
{
int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
@@ -174,15 +224,14 @@ zmq::signaler_t::~signaler_t ()
close (r);
}
-void zmq::signaler_t::signal (int signal_)
+void zmq::signaler_t::signal (uint32_t signal_)
{
- zmq_assert (signal_ >= 0 && signal_ < 64);
- unsigned char c = (unsigned char) signal_;
- ssize_t nbytes = send (w, &c, 1, 0);
- errno_assert (nbytes == 1);
+ ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0);
+ errno_assert (nbytes != -1);
+ zmq_assert (nbytes == sizeof (signal_);
}
-uint64_t zmq::signaler_t::poll ()
+void zmq::signaler_t::xpoll ()
{
// Set the reader to blocking mode.
int flags = fcntl (r, F_GETFL, 0);
@@ -192,7 +241,8 @@ uint64_t zmq::signaler_t::poll ()
errno_assert (rc != -1);
// Poll for events.
- uint64_t signals = check ();
+ xcheck ();
+ zmq_assert (current != count);
// Set the reader to non-blocking mode.
flags = fcntl (r, F_GETFL, 0);
@@ -200,29 +250,23 @@ uint64_t zmq::signaler_t::poll ()
flags = 0;
rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
-
- return signals;
}
-uint64_t zmq::signaler_t::check ()
+void zmq::signaler_t::xcheck ()
{
- unsigned char buffer [64];
- ssize_t nbytes = recv (r, buffer, 64, 0);
- if (nbytes == -1 && errno == EAGAIN)
- return 0;
+ ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0);
+ if (nbytes == -1 && errno == EAGAIN) {
+ current = 0;
+ count = 0;
+ return;
+ }
zmq_assert (nbytes != -1);
- uint64_t signals = 0;
- for (int pos = 0; pos != nbytes; pos ++) {
- zmq_assert (buffer [pos] < 64);
- signals |= (uint64_t (1) << (buffer [pos]));
- }
- return signals;
-}
+ // Check whether we haven't got half of a signal.
+ zmq_assert (nbytes % sizeof (uint32_t) == 0);
-zmq::fd_t zmq::signaler_t::get_fd ()
-{
- return r;
+ current = 0;
+ count = nbytes / sizeof (uint32_t);
}
#else
@@ -230,7 +274,9 @@ zmq::fd_t zmq::signaler_t::get_fd ()
#include <sys/types.h>
#include <sys/socket.h>
-zmq::signaler_t::signaler_t ()
+zmq::signaler_t::signaler_t () :
+ current (0),
+ count (0)
{
int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
@@ -245,50 +291,42 @@ zmq::signaler_t::~signaler_t ()
close (r);
}
-void zmq::signaler_t::signal (int signal_)
+void zmq::signaler_t::signal (uint32_t signal_)
{
// TODO: Note that send is a blocking operation.
// How should we behave if the signal cannot be written to the signaler?
-
- zmq_assert (signal_ >= 0 && signal_ < 64);
- unsigned char c = (unsigned char) signal_;
- ssize_t nbytes = send (w, &c, 1, 0);
- errno_assert (nbytes == 1);
+ ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0);
+ errno_assert (nbytes != -1);
+ zmq_assert (nbytes == sizeof (signal_));
}
-uint64_t zmq::signaler_t::poll ()
+void zmq::signaler_t::xpoll ()
{
- unsigned char buffer [64];
- ssize_t nbytes = recv (r, buffer, 64, 0);
- zmq_assert (nbytes != -1);
+ ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0);
+ errno_assert (nbytes != -1);
+
+ // Check whether we haven't got half of a signal.
+ zmq_assert (nbytes % sizeof (uint32_t) == 0);
- uint64_t signals = 0;
- for (int pos = 0; pos != nbytes; pos ++) {
- zmq_assert (buffer [pos] < 64);
- signals |= (uint64_t (1) << (buffer [pos]));
- }
- return signals;
+ current = 0;
+ count = nbytes / sizeof (uint32_t);
}
-uint64_t zmq::signaler_t::check ()
+void zmq::signaler_t::xcheck ()
{
- unsigned char buffer [64];
ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT);
- if (nbytes == -1 && errno == EAGAIN)
- return 0;
- zmq_assert (nbytes != -1);
-
- uint64_t signals = 0;
- for (int pos = 0; pos != nbytes; pos ++) {
- zmq_assert (buffer [pos] < 64);
- signals |= (uint64_t (1) << (buffer [pos]));
+ if (nbytes == -1 && errno == EAGAIN) {
+ current = 0;
+ count = 0;
+ return;
}
- return signals;
-}
+ errno_assert (nbytes != -1);
-zmq::fd_t zmq::signaler_t::get_fd ()
-{
- return r;
+ // Check whether we haven't got half of a signal.
+ zmq_assert (nbytes % sizeof (uint32_t) == 0);
+
+ current = 0;
+ count = nbytes / sizeof (uint32_t);
}
#endif