diff options
| -rw-r--r-- | src/app_thread.cpp | 18 | ||||
| -rw-r--r-- | src/app_thread.hpp | 5 | ||||
| -rw-r--r-- | src/fd_signaler.cpp | 39 | ||||
| -rw-r--r-- | src/fd_signaler.hpp | 3 | ||||
| -rw-r--r-- | src/i_signaler.hpp | 4 | ||||
| -rw-r--r-- | src/ypollset.cpp | 4 | ||||
| -rw-r--r-- | src/ypollset.hpp | 2 | 
7 files changed, 48 insertions, 27 deletions
| diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 8c83313..303c6a1 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -31,6 +31,8 @@  #include "app_thread.hpp"  #include "dispatcher.hpp" +#include "fd_signaler.hpp" +#include "ypollset.hpp"  #include "err.hpp"  #include "pipe.hpp"  #include "config.hpp" @@ -52,16 +54,26 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,      associated (false),      last_processing_time (0)  { +    if (flags_ & ZMQ_POLL) { +        signaler = new fd_signaler_t; +        zmq_assert (signaler); +    } +    else { +        signaler = new ypollset_t; +        zmq_assert (signaler); +    }  }  zmq::app_thread_t::~app_thread_t ()  {      zmq_assert (sockets.empty ()); +    zmq_assert (signaler); +    delete signaler;  }  zmq::i_signaler *zmq::app_thread_t::get_signaler ()  { -    return &pollset; +    return signaler;  }  bool zmq::app_thread_t::is_current () @@ -86,7 +98,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)  {      uint64_t signals;      if (block_) -        signals = pollset.poll (); +        signals = signaler->poll ();      else {  #if defined ZMQ_DELAY_COMMANDS @@ -119,7 +131,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)  #endif          //  Check whether there are any commands pending for this thread. -        signals = pollset.check (); +        signals = signaler->check ();      }      if (signals) { diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 5fdb92d..4fe67fb 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -24,7 +24,6 @@  #include "stdint.hpp"  #include "object.hpp" -#include "ypollset.hpp"  #include "thread.hpp"  namespace zmq @@ -40,7 +39,7 @@ namespace zmq          ~app_thread_t ();          //  Returns signaler associated with this application thread. -        i_signaler *get_signaler (); +        struct i_signaler *get_signaler ();          //  Nota bene: Following two functions are accessed from different          //  threads. The caller (dispatcher) is responsible for synchronisation @@ -79,7 +78,7 @@ namespace zmq          thread_t::id_t tid;          //  App thread's signaler object. -        ypollset_t pollset; +        struct i_signaler *signaler;          //  Timestamp of when commands were processed the last time.          uint64_t last_processing_time; diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp index 52c3129..8c71356 100644 --- a/src/fd_signaler.cpp +++ b/src/fd_signaler.cpp @@ -65,12 +65,18 @@ void zmq::fd_signaler_t::signal (int signal_)      errno_assert (sz == sizeof (uint64_t));  } +uint64_t zmq::fd_signaler_t::poll () +{ +    //  TODO: Can we do a blocking read on non-blocking eventfd? +    //  It's not needed as for now, so let it stay unimplemented. +    zmq_assert (false); +} +  uint64_t zmq::fd_signaler_t::check ()  {      uint64_t val;      ssize_t sz = read (fd, &val, sizeof (uint64_t)); -    if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || -          errno == EINTR)) +    if (sz == -1 && (errno == EAGAIN || errno == EINTR))          return 0;      errno_assert (sz != -1);      return val; @@ -120,7 +126,7 @@ zmq::fd_signaler_t::fd_signaler_t ()      rc = connect (w, (sockaddr *) &addr, sizeof (addr));      wsa_assert (rc != SOCKET_ERROR); -    //  Accept connection from w +    //  Accept connection from w.      r = accept (listener, NULL, NULL);      wsa_assert (r != INVALID_SOCKET); @@ -139,6 +145,9 @@ zmq::fd_signaler_t::~fd_signaler_t ()  void zmq::fd_signaler_t::signal (int signal_)  { +    //  TODO: Note that send is a blocking operation. +    //  How should we behave if the signal cannot be written to the signaler? +      zmq_assert (signal_ >= 0 && signal_ < 64);      char c = (char) signal_;      int rc = send (w, &c, 1, 0); @@ -154,7 +163,7 @@ uint64_t zmq::fd_signaler_t::poll ()      //  If there are no signals, wait until at least one signal arrives.      unsigned char sig; -    int nbytes = recv (r, (char*) &sig, 1, MSG_WAITALL); +    int nbytes = recv (r, (char*) &sig, 1, 0);      win_assert (nbytes != -1);      return uint64_t (1) << sig;  } @@ -162,7 +171,9 @@ uint64_t zmq::fd_signaler_t::poll ()  uint64_t zmq::fd_signaler_t::check ()  {      unsigned char buffer [32];       -    int nbytes = recv (r, (char*) buffer, 32, 0); +    int nbytes = recv (r, (char*) buffer, 32, MSG_DONTWAIT); +    if (nbytes == -1 && errno == EAGAIN) +        return 0;      win_assert (nbytes != -1);      uint64_t signals = 0; @@ -190,13 +201,6 @@ zmq::fd_signaler_t::fd_signaler_t ()      errno_assert (rc == 0);      w = sv [0];      r = sv [1]; -                       -    //  Set to non-blocking mode. -    int flags = fcntl (r, F_GETFL, 0); -    if (flags == -1) -        flags = 0; -    rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); -    errno_assert (rc != -1);  }  zmq::fd_signaler_t::~fd_signaler_t () @@ -207,6 +211,9 @@ zmq::fd_signaler_t::~fd_signaler_t ()  void zmq::fd_signaler_t::signal (int signal_)  { +    //  TODO: Note that send is a blocking operation. +    //  How should we behave if the signal cannot be written to the signaler? +      zmq_assert (signal_ >= 0 && signal_ < 64);      unsigned char c = (unsigned char) signal_;      ssize_t nbytes = send (w, &c, 1, 0); @@ -222,7 +229,7 @@ uint64_t zmq::fd_signaler_t::poll ()      //  If there are no signals, wait until at least one signal arrives.      unsigned char sig; -    ssize_t nbytes = recv (r, &sig, 1, MSG_WAITALL); +    ssize_t nbytes = recv (r, &sig, 1, 0);      errno_assert (nbytes != -1);      return uint64_t (1) << sig;  } @@ -230,8 +237,10 @@ uint64_t zmq::fd_signaler_t::poll ()  uint64_t zmq::fd_signaler_t::check ()  {      unsigned char buffer [32]; -    ssize_t nbytes = recv (r, buffer, 32, 0); -    errno_assert (nbytes != -1); +    ssize_t nbytes = recv (r, buffer, 32, MSG_DONTWAIT); +    if (nbytes == -1 && errno == EAGAIN) +        return 0; +    zmq_assert (nbytes != -1);      uint64_t signals = 0;      for (int pos = 0; pos != nbytes; pos ++) { diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index 107645e..e1b56ad 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -37,10 +37,7 @@ namespace zmq      {      public: -        //  Initialise the object.          fd_signaler_t (); - -        //  Destroy the object.          ~fd_signaler_t ();          //  i_signaler interface implementation. diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp index 729b5b7..a09fe7e 100644 --- a/src/i_signaler.hpp +++ b/src/i_signaler.hpp @@ -37,11 +37,11 @@ namespace zmq          //  Wait for signal. Returns a set of signals in form of a bitmap.          //  Signal with index 0 corresponds to value 1, index 1 to value 2,          //  index 2 to value 3 etc. -        uint64_t poll (); +        virtual uint64_t poll () = 0;          //  Same as poll, however, if there is no signal available,          //  function returns zero immediately instead of waiting for a signal. -        uint64_t check (); +        virtual uint64_t check () = 0;      };  } diff --git a/src/ypollset.cpp b/src/ypollset.cpp index 5a7c69f..a72eac7 100644 --- a/src/ypollset.cpp +++ b/src/ypollset.cpp @@ -23,6 +23,10 @@ zmq::ypollset_t::ypollset_t ()  {  } +zmq::ypollset_t::~ypollset_t () +{ +} +  void zmq::ypollset_t::signal (int signal_)  {      zmq_assert (signal_ >= 0 && signal_ < wait_signal); diff --git a/src/ypollset.hpp b/src/ypollset.hpp index 296201a..25eb3e0 100644 --- a/src/ypollset.hpp +++ b/src/ypollset.hpp @@ -35,8 +35,8 @@ namespace zmq      {      public: -        //  Create the pollset.          ypollset_t (); +        ~ypollset_t ();          //  i_signaler interface implementation.          void signal (int signal_); | 
