diff options
| -rw-r--r-- | src/app_thread.cpp | 4 | ||||
| -rw-r--r-- | src/fd_signaler.cpp | 55 | ||||
| -rw-r--r-- | src/fd_signaler.hpp | 12 | ||||
| -rw-r--r-- | src/i_signaler.hpp | 11 | ||||
| -rw-r--r-- | src/io_thread.cpp | 4 | ||||
| -rw-r--r-- | src/ypollset.cpp | 8 | ||||
| -rw-r--r-- | src/ypollset.hpp | 18 | 
7 files changed, 70 insertions, 42 deletions
| diff --git a/src/app_thread.cpp b/src/app_thread.cpp index c152fc8..8c83313 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -84,7 +84,7 @@ bool zmq::app_thread_t::make_current ()  void zmq::app_thread_t::process_commands (bool block_, bool throttle_)  { -    ypollset_t::signals_t signals; +    uint64_t signals;      if (block_)          signals = pollset.poll ();      else { @@ -127,7 +127,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)          //  Traverse all the possible sources of commands and process          //  all the commands from all of them.          for (int i = 0; i != thread_slot_count (); i++) { -            if (signals & (ypollset_t::signals_t (1) << i)) { +            if (signals & (uint64_t (1) << i)) {                  command_t cmd;                  while (dispatcher->read (i, get_thread_slot (), &cmd))                      cmd.destination->process_command (cmd); diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp index 41d168d..f93f4e0 100644 --- a/src/fd_signaler.cpp +++ b/src/fd_signaler.cpp @@ -59,16 +59,16 @@ zmq::fd_signaler_t::~fd_signaler_t ()  void zmq::fd_signaler_t::signal (int signal_)  {      zmq_assert (signal_ >= 0 && signal_ < 64); -    signals_t inc = 1; +    uint64_t inc = 1;      inc <<= signal_; -    ssize_t sz = write (fd, &inc, sizeof (signals_t)); -    errno_assert (sz == sizeof (signals_t)); +    ssize_t sz = write (fd, &inc, sizeof (uint64_t)); +    errno_assert (sz == sizeof (uint64_t));  } -zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () +uint64_t zmq::fd_signaler_t::check ()  { -    signals_t val; -    ssize_t sz = read (fd, &val, sizeof (signals_t)); +    uint64_t val; +    ssize_t sz = read (fd, &val, sizeof (uint64_t));      if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||            errno == EINTR))          return 0; @@ -148,16 +148,30 @@ void zmq::fd_signaler_t::signal (int signal_)      win_assert (rc != SOCKET_ERROR);  } -zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () +uint64_t zmq::fd_signaler_t::poll ()  { -    char buffer [32];       +    //  If there are signals available, return straight away. +    uint64_t signals = check (); +    if (signals) +        return signals; + +    //  If there are no signals, wait until at least one signal arrives. +    unsigned char sig; +    int nbytes = recv (r, &sig, 1, MSG_WAITALL); +    win_assert (nbytes != -1); +    return uint64_t (1) << sig; +} + +uint64_t zmq::fd_signaler_t::check () +{ +    unsigned char buffer [32];            int nbytes = recv (r, buffer, 32, 0);      win_assert (nbytes != -1); -    signals_t signals = 0; +    uint64_t signals = 0;      for (int pos = 0; pos != nbytes; pos++) {          zmq_assert (buffer [pos] < 64); -        signals |= (signals_t (1) << (buffer [pos])); +        signals |= (uint64_t (1) << (buffer [pos]));      }      return signals;  } @@ -202,15 +216,30 @@ void zmq::fd_signaler_t::signal (int signal_)      errno_assert (nbytes == 1);  } -zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () +uint64_t zmq::fd_signaler_t::poll () +{ +    //  If there are signals available, return straight away. +    uint64_t signals = check (); +    if (signals) +        return signals; + +    //  If there are no signals, wait until at least one signal arrives. +    unsigned char sig; +    ssize_t nbytes = recv (r, &sig, 1, MSG_WAITALL); +    errno_assert (nbytes != -1); +    return uint64_t (1) << sig; +} + +uint64_t zmq::fd_signaler_t::check ()  {      unsigned char buffer [32];      ssize_t nbytes = recv (r, buffer, 32, 0);      errno_assert (nbytes != -1); -    signals_t signals = 0; + +    uint64_t signals = 0;      for (int pos = 0; pos != nbytes; pos ++) {          zmq_assert (buffer [pos] < 64); -        signals |= (1 << (buffer [pos])); +        signals |= (uint64_t (1) << (buffer [pos]));      }      return signals;  } diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index 11baa95..107645e 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -37,22 +37,16 @@ namespace zmq      {      public: -        typedef uint64_t signals_t; -          //  Initialise the object.          fd_signaler_t ();          //  Destroy the object.          ~fd_signaler_t (); -        //  Send specific signal. +        //  i_signaler interface implementation.          void signal (int signal_); - -        //  Retrieves signals. Returns a set of signals in form of a bitmap. -        //  Signal with index 0 corresponds to value 1, index 1 to value 2, -        //  index 2 to value 4 etc. If there is no signal available, -        //  it returns zero immediately. -        signals_t check (); +        uint64_t poll (); +        uint64_t check ();          //  Get the file descriptor associated with the object.          fd_t get_fd (); diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp index adf54e5..729b5b7 100644 --- a/src/i_signaler.hpp +++ b/src/i_signaler.hpp @@ -20,6 +20,8 @@  #ifndef __ZMQ_I_SIGNALER_HPP_INCLUDED__  #define __ZMQ_I_SIGNALER_HPP_INCLUDED__ +#include "stdint.hpp" +  namespace zmq  {      //  Virtual interface used to send signals. Individual implementations @@ -31,6 +33,15 @@ namespace zmq          //  Send a signal with a specific ID.          virtual void signal (int signal_) = 0; + +        //  Wait for signal. Returns a set of signals in form of a bitmap. +        //  Signal with index 0 corresponds to value 1, index 1 to value 2, +        //  index 2 to value 3 etc. +        uint64_t poll (); + +        //  Same as poll, however, if there is no signal available, +        //  function returns zero immediately instead of waiting for a signal. +        uint64_t check ();      };  } diff --git a/src/io_thread.cpp b/src/io_thread.cpp index b6521e9..a90876c 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -104,7 +104,7 @@ int zmq::io_thread_t::get_load ()  void zmq::io_thread_t::in_event ()  {      //  Find out which threads are sending us commands. -    fd_signaler_t::signals_t signals = signaler.check (); +    uint64_t signals = signaler.check ();      zmq_assert (signals);      //  Iterate through all the threads in the process and find out @@ -112,7 +112,7 @@ void zmq::io_thread_t::in_event ()      int slot_count = thread_slot_count ();      for (int source_thread_slot = 0;            source_thread_slot != slot_count; source_thread_slot++) { -        if (signals & (fd_signaler_t::signals_t (1) << source_thread_slot)) { +        if (signals & (uint64_t (1) << source_thread_slot)) {              //  Read all the commands from particular thread.              command_t cmd; diff --git a/src/ypollset.cpp b/src/ypollset.cpp index a90d042..5a7c69f 100644 --- a/src/ypollset.cpp +++ b/src/ypollset.cpp @@ -30,7 +30,7 @@ void zmq::ypollset_t::signal (int signal_)          sem.post ();   } -zmq::ypollset_t::signals_t zmq::ypollset_t::poll () +uint64_t zmq::ypollset_t::poll ()  {      signals_t result = 0;      while (!result) { @@ -47,10 +47,10 @@ zmq::ypollset_t::signals_t zmq::ypollset_t::poll ()          //  operation (set and reset). In such case looping can occur          //  sporadically.      } -    return result;       +    return (uint64_t) result;        } -zmq::ypollset_t::signals_t zmq::ypollset_t::check () +uint64_t zmq::ypollset_t::check ()  { -    return bits.xchg (0); +    return (uint64_t) bits.xchg (0);  } diff --git a/src/ypollset.hpp b/src/ypollset.hpp index b49581a..296201a 100644 --- a/src/ypollset.hpp +++ b/src/ypollset.hpp @@ -35,25 +35,19 @@ namespace zmq      {      public: -        typedef atomic_bitmap_t::bitmap_t signals_t; -          //  Create the pollset.          ypollset_t (); -        //  Send a signal to the pollset (i_singnaler implementation). +        //  i_signaler interface implementation.          void signal (int signal_); - -        //  Wait for signal. Returns a set of signals in form of a bitmap. -        //  Signal with index 0 corresponds to value 1, index 1 to value 2, -        //  index 2 to value 3 etc. -        signals_t poll (); - -        //  Same as poll, however, if there is no signal available, -        //  function returns zero immediately instead of waiting for a signal. -        signals_t check (); +        uint64_t poll (); +        uint64_t check ();      private: +        //  Internal representation of signal bitmap. +        typedef atomic_bitmap_t::bitmap_t signals_t; +          //  Wait signal is carried in the most significant bit of integer.          enum {wait_signal = sizeof (signals_t) * 8 - 1}; | 
