diff options
-rw-r--r-- | src/app_thread.cpp | 18 | ||||
-rw-r--r-- | src/app_thread.hpp | 5 | ||||
-rw-r--r-- | src/fd_signaler.cpp | 39 | ||||
-rw-r--r-- | src/fd_signaler.hpp | 3 | ||||
-rw-r--r-- | src/i_signaler.hpp | 4 | ||||
-rw-r--r-- | src/ypollset.cpp | 4 | ||||
-rw-r--r-- | src/ypollset.hpp | 2 |
7 files changed, 48 insertions, 27 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 8c83313..303c6a1 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -31,6 +31,8 @@ #include "app_thread.hpp" #include "dispatcher.hpp" +#include "fd_signaler.hpp" +#include "ypollset.hpp" #include "err.hpp" #include "pipe.hpp" #include "config.hpp" @@ -52,16 +54,26 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, associated (false), last_processing_time (0) { + if (flags_ & ZMQ_POLL) { + signaler = new fd_signaler_t; + zmq_assert (signaler); + } + else { + signaler = new ypollset_t; + zmq_assert (signaler); + } } zmq::app_thread_t::~app_thread_t () { zmq_assert (sockets.empty ()); + zmq_assert (signaler); + delete signaler; } zmq::i_signaler *zmq::app_thread_t::get_signaler () { - return &pollset; + return signaler; } bool zmq::app_thread_t::is_current () @@ -86,7 +98,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) { uint64_t signals; if (block_) - signals = pollset.poll (); + signals = signaler->poll (); else { #if defined ZMQ_DELAY_COMMANDS @@ -119,7 +131,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) #endif // Check whether there are any commands pending for this thread. - signals = pollset.check (); + signals = signaler->check (); } if (signals) { diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 5fdb92d..4fe67fb 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -24,7 +24,6 @@ #include "stdint.hpp" #include "object.hpp" -#include "ypollset.hpp" #include "thread.hpp" namespace zmq @@ -40,7 +39,7 @@ namespace zmq ~app_thread_t (); // Returns signaler associated with this application thread. - i_signaler *get_signaler (); + struct i_signaler *get_signaler (); // Nota bene: Following two functions are accessed from different // threads. The caller (dispatcher) is responsible for synchronisation @@ -79,7 +78,7 @@ namespace zmq thread_t::id_t tid; // App thread's signaler object. - ypollset_t pollset; + struct i_signaler *signaler; // Timestamp of when commands were processed the last time. uint64_t last_processing_time; diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp index 52c3129..8c71356 100644 --- a/src/fd_signaler.cpp +++ b/src/fd_signaler.cpp @@ -65,12 +65,18 @@ void zmq::fd_signaler_t::signal (int signal_) errno_assert (sz == sizeof (uint64_t)); } +uint64_t zmq::fd_signaler_t::poll () +{ + // TODO: Can we do a blocking read on non-blocking eventfd? + // It's not needed as for now, so let it stay unimplemented. + zmq_assert (false); +} + uint64_t zmq::fd_signaler_t::check () { uint64_t val; ssize_t sz = read (fd, &val, sizeof (uint64_t)); - if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || - errno == EINTR)) + if (sz == -1 && (errno == EAGAIN || errno == EINTR)) return 0; errno_assert (sz != -1); return val; @@ -120,7 +126,7 @@ zmq::fd_signaler_t::fd_signaler_t () rc = connect (w, (sockaddr *) &addr, sizeof (addr)); wsa_assert (rc != SOCKET_ERROR); - // Accept connection from w + // Accept connection from w. r = accept (listener, NULL, NULL); wsa_assert (r != INVALID_SOCKET); @@ -139,6 +145,9 @@ zmq::fd_signaler_t::~fd_signaler_t () void zmq::fd_signaler_t::signal (int signal_) { + // TODO: Note that send is a blocking operation. + // How should we behave if the signal cannot be written to the signaler? + zmq_assert (signal_ >= 0 && signal_ < 64); char c = (char) signal_; int rc = send (w, &c, 1, 0); @@ -154,7 +163,7 @@ uint64_t zmq::fd_signaler_t::poll () // If there are no signals, wait until at least one signal arrives. unsigned char sig; - int nbytes = recv (r, (char*) &sig, 1, MSG_WAITALL); + int nbytes = recv (r, (char*) &sig, 1, 0); win_assert (nbytes != -1); return uint64_t (1) << sig; } @@ -162,7 +171,9 @@ uint64_t zmq::fd_signaler_t::poll () uint64_t zmq::fd_signaler_t::check () { unsigned char buffer [32]; - int nbytes = recv (r, (char*) buffer, 32, 0); + int nbytes = recv (r, (char*) buffer, 32, MSG_DONTWAIT); + if (nbytes == -1 && errno == EAGAIN) + return 0; win_assert (nbytes != -1); uint64_t signals = 0; @@ -190,13 +201,6 @@ zmq::fd_signaler_t::fd_signaler_t () errno_assert (rc == 0); w = sv [0]; r = sv [1]; - - // Set to non-blocking mode. - int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; - rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); } zmq::fd_signaler_t::~fd_signaler_t () @@ -207,6 +211,9 @@ zmq::fd_signaler_t::~fd_signaler_t () void zmq::fd_signaler_t::signal (int signal_) { + // TODO: Note that send is a blocking operation. + // How should we behave if the signal cannot be written to the signaler? + zmq_assert (signal_ >= 0 && signal_ < 64); unsigned char c = (unsigned char) signal_; ssize_t nbytes = send (w, &c, 1, 0); @@ -222,7 +229,7 @@ uint64_t zmq::fd_signaler_t::poll () // If there are no signals, wait until at least one signal arrives. unsigned char sig; - ssize_t nbytes = recv (r, &sig, 1, MSG_WAITALL); + ssize_t nbytes = recv (r, &sig, 1, 0); errno_assert (nbytes != -1); return uint64_t (1) << sig; } @@ -230,8 +237,10 @@ uint64_t zmq::fd_signaler_t::poll () uint64_t zmq::fd_signaler_t::check () { unsigned char buffer [32]; - ssize_t nbytes = recv (r, buffer, 32, 0); - errno_assert (nbytes != -1); + ssize_t nbytes = recv (r, buffer, 32, MSG_DONTWAIT); + if (nbytes == -1 && errno == EAGAIN) + return 0; + zmq_assert (nbytes != -1); uint64_t signals = 0; for (int pos = 0; pos != nbytes; pos ++) { diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index 107645e..e1b56ad 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -37,10 +37,7 @@ namespace zmq { public: - // Initialise the object. fd_signaler_t (); - - // Destroy the object. ~fd_signaler_t (); // i_signaler interface implementation. diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp index 729b5b7..a09fe7e 100644 --- a/src/i_signaler.hpp +++ b/src/i_signaler.hpp @@ -37,11 +37,11 @@ namespace zmq // Wait for signal. Returns a set of signals in form of a bitmap. // Signal with index 0 corresponds to value 1, index 1 to value 2, // index 2 to value 3 etc. - uint64_t poll (); + virtual uint64_t poll () = 0; // Same as poll, however, if there is no signal available, // function returns zero immediately instead of waiting for a signal. - uint64_t check (); + virtual uint64_t check () = 0; }; } diff --git a/src/ypollset.cpp b/src/ypollset.cpp index 5a7c69f..a72eac7 100644 --- a/src/ypollset.cpp +++ b/src/ypollset.cpp @@ -23,6 +23,10 @@ zmq::ypollset_t::ypollset_t () { } +zmq::ypollset_t::~ypollset_t () +{ +} + void zmq::ypollset_t::signal (int signal_) { zmq_assert (signal_ >= 0 && signal_ < wait_signal); diff --git a/src/ypollset.hpp b/src/ypollset.hpp index 296201a..25eb3e0 100644 --- a/src/ypollset.hpp +++ b/src/ypollset.hpp @@ -35,8 +35,8 @@ namespace zmq { public: - // Create the pollset. ypollset_t (); + ~ypollset_t (); // i_signaler interface implementation. void signal (int signal_); |