summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-09-20 10:47:27 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-09-20 10:47:27 +0200
commitf99b8fc921bc0e6aa55276d8c55e43c9d7f4375a (patch)
tree2201248b8fb13ec7d2831b74b349a61fd1e07fa4
parent50a8b9ea0c4a819073b46449dee8fc839b837ae5 (diff)
receiving side of signaler virtualised
-rw-r--r--src/app_thread.cpp4
-rw-r--r--src/fd_signaler.cpp55
-rw-r--r--src/fd_signaler.hpp12
-rw-r--r--src/i_signaler.hpp11
-rw-r--r--src/io_thread.cpp4
-rw-r--r--src/ypollset.cpp8
-rw-r--r--src/ypollset.hpp18
7 files changed, 70 insertions, 42 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index c152fc8..8c83313 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -84,7 +84,7 @@ bool zmq::app_thread_t::make_current ()
void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
- ypollset_t::signals_t signals;
+ uint64_t signals;
if (block_)
signals = pollset.poll ();
else {
@@ -127,7 +127,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
// Traverse all the possible sources of commands and process
// all the commands from all of them.
for (int i = 0; i != thread_slot_count (); i++) {
- if (signals & (ypollset_t::signals_t (1) << i)) {
+ if (signals & (uint64_t (1) << i)) {
command_t cmd;
while (dispatcher->read (i, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd);
diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp
index 41d168d..f93f4e0 100644
--- a/src/fd_signaler.cpp
+++ b/src/fd_signaler.cpp
@@ -59,16 +59,16 @@ zmq::fd_signaler_t::~fd_signaler_t ()
void zmq::fd_signaler_t::signal (int signal_)
{
zmq_assert (signal_ >= 0 && signal_ < 64);
- signals_t inc = 1;
+ uint64_t inc = 1;
inc <<= signal_;
- ssize_t sz = write (fd, &inc, sizeof (signals_t));
- errno_assert (sz == sizeof (signals_t));
+ ssize_t sz = write (fd, &inc, sizeof (uint64_t));
+ errno_assert (sz == sizeof (uint64_t));
}
-zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check ()
+uint64_t zmq::fd_signaler_t::check ()
{
- signals_t val;
- ssize_t sz = read (fd, &val, sizeof (signals_t));
+ uint64_t val;
+ ssize_t sz = read (fd, &val, sizeof (uint64_t));
if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR))
return 0;
@@ -148,16 +148,30 @@ void zmq::fd_signaler_t::signal (int signal_)
win_assert (rc != SOCKET_ERROR);
}
-zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check ()
+uint64_t zmq::fd_signaler_t::poll ()
{
- char buffer [32];
+ // If there are signals available, return straight away.
+ uint64_t signals = check ();
+ if (signals)
+ return signals;
+
+ // If there are no signals, wait until at least one signal arrives.
+ unsigned char sig;
+ int nbytes = recv (r, &sig, 1, MSG_WAITALL);
+ win_assert (nbytes != -1);
+ return uint64_t (1) << sig;
+}
+
+uint64_t zmq::fd_signaler_t::check ()
+{
+ unsigned char buffer [32];
int nbytes = recv (r, buffer, 32, 0);
win_assert (nbytes != -1);
- signals_t signals = 0;
+ uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos++) {
zmq_assert (buffer [pos] < 64);
- signals |= (signals_t (1) << (buffer [pos]));
+ signals |= (uint64_t (1) << (buffer [pos]));
}
return signals;
}
@@ -202,15 +216,30 @@ void zmq::fd_signaler_t::signal (int signal_)
errno_assert (nbytes == 1);
}
-zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check ()
+uint64_t zmq::fd_signaler_t::poll ()
+{
+ // If there are signals available, return straight away.
+ uint64_t signals = check ();
+ if (signals)
+ return signals;
+
+ // If there are no signals, wait until at least one signal arrives.
+ unsigned char sig;
+ ssize_t nbytes = recv (r, &sig, 1, MSG_WAITALL);
+ errno_assert (nbytes != -1);
+ return uint64_t (1) << sig;
+}
+
+uint64_t zmq::fd_signaler_t::check ()
{
unsigned char buffer [32];
ssize_t nbytes = recv (r, buffer, 32, 0);
errno_assert (nbytes != -1);
- signals_t signals = 0;
+
+ uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos ++) {
zmq_assert (buffer [pos] < 64);
- signals |= (1 << (buffer [pos]));
+ signals |= (uint64_t (1) << (buffer [pos]));
}
return signals;
}
diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp
index 11baa95..107645e 100644
--- a/src/fd_signaler.hpp
+++ b/src/fd_signaler.hpp
@@ -37,22 +37,16 @@ namespace zmq
{
public:
- typedef uint64_t signals_t;
-
// Initialise the object.
fd_signaler_t ();
// Destroy the object.
~fd_signaler_t ();
- // Send specific signal.
+ // i_signaler interface implementation.
void signal (int signal_);
-
- // Retrieves signals. Returns a set of signals in form of a bitmap.
- // Signal with index 0 corresponds to value 1, index 1 to value 2,
- // index 2 to value 4 etc. If there is no signal available,
- // it returns zero immediately.
- signals_t check ();
+ uint64_t poll ();
+ uint64_t check ();
// Get the file descriptor associated with the object.
fd_t get_fd ();
diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp
index adf54e5..729b5b7 100644
--- a/src/i_signaler.hpp
+++ b/src/i_signaler.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_SIGNALER_HPP_INCLUDED__
#define __ZMQ_I_SIGNALER_HPP_INCLUDED__
+#include "stdint.hpp"
+
namespace zmq
{
// Virtual interface used to send signals. Individual implementations
@@ -31,6 +33,15 @@ namespace zmq
// Send a signal with a specific ID.
virtual void signal (int signal_) = 0;
+
+ // Wait for signal. Returns a set of signals in form of a bitmap.
+ // Signal with index 0 corresponds to value 1, index 1 to value 2,
+ // index 2 to value 3 etc.
+ uint64_t poll ();
+
+ // Same as poll, however, if there is no signal available,
+ // function returns zero immediately instead of waiting for a signal.
+ uint64_t check ();
};
}
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index b6521e9..a90876c 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -104,7 +104,7 @@ int zmq::io_thread_t::get_load ()
void zmq::io_thread_t::in_event ()
{
// Find out which threads are sending us commands.
- fd_signaler_t::signals_t signals = signaler.check ();
+ uint64_t signals = signaler.check ();
zmq_assert (signals);
// Iterate through all the threads in the process and find out
@@ -112,7 +112,7 @@ void zmq::io_thread_t::in_event ()
int slot_count = thread_slot_count ();
for (int source_thread_slot = 0;
source_thread_slot != slot_count; source_thread_slot++) {
- if (signals & (fd_signaler_t::signals_t (1) << source_thread_slot)) {
+ if (signals & (uint64_t (1) << source_thread_slot)) {
// Read all the commands from particular thread.
command_t cmd;
diff --git a/src/ypollset.cpp b/src/ypollset.cpp
index a90d042..5a7c69f 100644
--- a/src/ypollset.cpp
+++ b/src/ypollset.cpp
@@ -30,7 +30,7 @@ void zmq::ypollset_t::signal (int signal_)
sem.post ();
}
-zmq::ypollset_t::signals_t zmq::ypollset_t::poll ()
+uint64_t zmq::ypollset_t::poll ()
{
signals_t result = 0;
while (!result) {
@@ -47,10 +47,10 @@ zmq::ypollset_t::signals_t zmq::ypollset_t::poll ()
// operation (set and reset). In such case looping can occur
// sporadically.
}
- return result;
+ return (uint64_t) result;
}
-zmq::ypollset_t::signals_t zmq::ypollset_t::check ()
+uint64_t zmq::ypollset_t::check ()
{
- return bits.xchg (0);
+ return (uint64_t) bits.xchg (0);
}
diff --git a/src/ypollset.hpp b/src/ypollset.hpp
index b49581a..296201a 100644
--- a/src/ypollset.hpp
+++ b/src/ypollset.hpp
@@ -35,25 +35,19 @@ namespace zmq
{
public:
- typedef atomic_bitmap_t::bitmap_t signals_t;
-
// Create the pollset.
ypollset_t ();
- // Send a signal to the pollset (i_singnaler implementation).
+ // i_signaler interface implementation.
void signal (int signal_);
-
- // Wait for signal. Returns a set of signals in form of a bitmap.
- // Signal with index 0 corresponds to value 1, index 1 to value 2,
- // index 2 to value 3 etc.
- signals_t poll ();
-
- // Same as poll, however, if there is no signal available,
- // function returns zero immediately instead of waiting for a signal.
- signals_t check ();
+ uint64_t poll ();
+ uint64_t check ();
private:
+ // Internal representation of signal bitmap.
+ typedef atomic_bitmap_t::bitmap_t signals_t;
+
// Wait signal is carried in the most significant bit of integer.
enum {wait_signal = sizeof (signals_t) * 8 - 1};