From f99b8fc921bc0e6aa55276d8c55e43c9d7f4375a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 20 Sep 2009 10:47:27 +0200 Subject: receiving side of signaler virtualised --- src/app_thread.cpp | 4 ++-- src/fd_signaler.cpp | 55 ++++++++++++++++++++++++++++++++++++++++------------- src/fd_signaler.hpp | 12 +++--------- src/i_signaler.hpp | 11 +++++++++++ src/io_thread.cpp | 4 ++-- src/ypollset.cpp | 8 ++++---- src/ypollset.hpp | 18 ++++++------------ 7 files changed, 70 insertions(+), 42 deletions(-) (limited to 'src') diff --git a/src/app_thread.cpp b/src/app_thread.cpp index c152fc8..8c83313 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -84,7 +84,7 @@ bool zmq::app_thread_t::make_current () void zmq::app_thread_t::process_commands (bool block_, bool throttle_) { - ypollset_t::signals_t signals; + uint64_t signals; if (block_) signals = pollset.poll (); else { @@ -127,7 +127,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) // Traverse all the possible sources of commands and process // all the commands from all of them. for (int i = 0; i != thread_slot_count (); i++) { - if (signals & (ypollset_t::signals_t (1) << i)) { + if (signals & (uint64_t (1) << i)) { command_t cmd; while (dispatcher->read (i, get_thread_slot (), &cmd)) cmd.destination->process_command (cmd); diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp index 41d168d..f93f4e0 100644 --- a/src/fd_signaler.cpp +++ b/src/fd_signaler.cpp @@ -59,16 +59,16 @@ zmq::fd_signaler_t::~fd_signaler_t () void zmq::fd_signaler_t::signal (int signal_) { zmq_assert (signal_ >= 0 && signal_ < 64); - signals_t inc = 1; + uint64_t inc = 1; inc <<= signal_; - ssize_t sz = write (fd, &inc, sizeof (signals_t)); - errno_assert (sz == sizeof (signals_t)); + ssize_t sz = write (fd, &inc, sizeof (uint64_t)); + errno_assert (sz == sizeof (uint64_t)); } -zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () +uint64_t zmq::fd_signaler_t::check () { - signals_t val; - ssize_t sz = read (fd, &val, sizeof (signals_t)); + uint64_t val; + ssize_t sz = read (fd, &val, sizeof (uint64_t)); if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) return 0; @@ -148,16 +148,30 @@ void zmq::fd_signaler_t::signal (int signal_) win_assert (rc != SOCKET_ERROR); } -zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () +uint64_t zmq::fd_signaler_t::poll () { - char buffer [32]; + // If there are signals available, return straight away. + uint64_t signals = check (); + if (signals) + return signals; + + // If there are no signals, wait until at least one signal arrives. + unsigned char sig; + int nbytes = recv (r, &sig, 1, MSG_WAITALL); + win_assert (nbytes != -1); + return uint64_t (1) << sig; +} + +uint64_t zmq::fd_signaler_t::check () +{ + unsigned char buffer [32]; int nbytes = recv (r, buffer, 32, 0); win_assert (nbytes != -1); - signals_t signals = 0; + uint64_t signals = 0; for (int pos = 0; pos != nbytes; pos++) { zmq_assert (buffer [pos] < 64); - signals |= (signals_t (1) << (buffer [pos])); + signals |= (uint64_t (1) << (buffer [pos])); } return signals; } @@ -202,15 +216,30 @@ void zmq::fd_signaler_t::signal (int signal_) errno_assert (nbytes == 1); } -zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () +uint64_t zmq::fd_signaler_t::poll () +{ + // If there are signals available, return straight away. + uint64_t signals = check (); + if (signals) + return signals; + + // If there are no signals, wait until at least one signal arrives. + unsigned char sig; + ssize_t nbytes = recv (r, &sig, 1, MSG_WAITALL); + errno_assert (nbytes != -1); + return uint64_t (1) << sig; +} + +uint64_t zmq::fd_signaler_t::check () { unsigned char buffer [32]; ssize_t nbytes = recv (r, buffer, 32, 0); errno_assert (nbytes != -1); - signals_t signals = 0; + + uint64_t signals = 0; for (int pos = 0; pos != nbytes; pos ++) { zmq_assert (buffer [pos] < 64); - signals |= (1 << (buffer [pos])); + signals |= (uint64_t (1) << (buffer [pos])); } return signals; } diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index 11baa95..107645e 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -37,22 +37,16 @@ namespace zmq { public: - typedef uint64_t signals_t; - // Initialise the object. fd_signaler_t (); // Destroy the object. ~fd_signaler_t (); - // Send specific signal. + // i_signaler interface implementation. void signal (int signal_); - - // Retrieves signals. Returns a set of signals in form of a bitmap. - // Signal with index 0 corresponds to value 1, index 1 to value 2, - // index 2 to value 4 etc. If there is no signal available, - // it returns zero immediately. - signals_t check (); + uint64_t poll (); + uint64_t check (); // Get the file descriptor associated with the object. fd_t get_fd (); diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp index adf54e5..729b5b7 100644 --- a/src/i_signaler.hpp +++ b/src/i_signaler.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_I_SIGNALER_HPP_INCLUDED__ #define __ZMQ_I_SIGNALER_HPP_INCLUDED__ +#include "stdint.hpp" + namespace zmq { // Virtual interface used to send signals. Individual implementations @@ -31,6 +33,15 @@ namespace zmq // Send a signal with a specific ID. virtual void signal (int signal_) = 0; + + // Wait for signal. Returns a set of signals in form of a bitmap. + // Signal with index 0 corresponds to value 1, index 1 to value 2, + // index 2 to value 3 etc. + uint64_t poll (); + + // Same as poll, however, if there is no signal available, + // function returns zero immediately instead of waiting for a signal. + uint64_t check (); }; } diff --git a/src/io_thread.cpp b/src/io_thread.cpp index b6521e9..a90876c 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -104,7 +104,7 @@ int zmq::io_thread_t::get_load () void zmq::io_thread_t::in_event () { // Find out which threads are sending us commands. - fd_signaler_t::signals_t signals = signaler.check (); + uint64_t signals = signaler.check (); zmq_assert (signals); // Iterate through all the threads in the process and find out @@ -112,7 +112,7 @@ void zmq::io_thread_t::in_event () int slot_count = thread_slot_count (); for (int source_thread_slot = 0; source_thread_slot != slot_count; source_thread_slot++) { - if (signals & (fd_signaler_t::signals_t (1) << source_thread_slot)) { + if (signals & (uint64_t (1) << source_thread_slot)) { // Read all the commands from particular thread. command_t cmd; diff --git a/src/ypollset.cpp b/src/ypollset.cpp index a90d042..5a7c69f 100644 --- a/src/ypollset.cpp +++ b/src/ypollset.cpp @@ -30,7 +30,7 @@ void zmq::ypollset_t::signal (int signal_) sem.post (); } -zmq::ypollset_t::signals_t zmq::ypollset_t::poll () +uint64_t zmq::ypollset_t::poll () { signals_t result = 0; while (!result) { @@ -47,10 +47,10 @@ zmq::ypollset_t::signals_t zmq::ypollset_t::poll () // operation (set and reset). In such case looping can occur // sporadically. } - return result; + return (uint64_t) result; } -zmq::ypollset_t::signals_t zmq::ypollset_t::check () +uint64_t zmq::ypollset_t::check () { - return bits.xchg (0); + return (uint64_t) bits.xchg (0); } diff --git a/src/ypollset.hpp b/src/ypollset.hpp index b49581a..296201a 100644 --- a/src/ypollset.hpp +++ b/src/ypollset.hpp @@ -35,25 +35,19 @@ namespace zmq { public: - typedef atomic_bitmap_t::bitmap_t signals_t; - // Create the pollset. ypollset_t (); - // Send a signal to the pollset (i_singnaler implementation). + // i_signaler interface implementation. void signal (int signal_); - - // Wait for signal. Returns a set of signals in form of a bitmap. - // Signal with index 0 corresponds to value 1, index 1 to value 2, - // index 2 to value 3 etc. - signals_t poll (); - - // Same as poll, however, if there is no signal available, - // function returns zero immediately instead of waiting for a signal. - signals_t check (); + uint64_t poll (); + uint64_t check (); private: + // Internal representation of signal bitmap. + typedef atomic_bitmap_t::bitmap_t signals_t; + // Wait signal is carried in the most significant bit of integer. enum {wait_signal = sizeof (signals_t) * 8 - 1}; -- cgit v1.2.3