diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/app_thread.cpp | 25 | ||||
-rw-r--r-- | src/app_thread.hpp | 2 | ||||
-rw-r--r-- | src/config.hpp | 4 | ||||
-rw-r--r-- | src/dispatcher.cpp | 16 | ||||
-rw-r--r-- | src/dispatcher.hpp | 10 | ||||
-rw-r--r-- | src/io_thread.cpp | 30 | ||||
-rw-r--r-- | src/io_thread.hpp | 2 | ||||
-rw-r--r-- | src/object.cpp | 13 | ||||
-rw-r--r-- | src/object.hpp | 9 | ||||
-rw-r--r-- | src/signaler.cpp | 200 | ||||
-rw-r--r-- | src/signaler.hpp | 24 | ||||
-rw-r--r-- | src/zmq.cpp | 2 |
12 files changed, 190 insertions, 147 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 10068c0..1350248 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -57,7 +57,8 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, + uint32_t thread_slot_) : object_t (dispatcher_, thread_slot_), last_processing_time (0), terminated (false) @@ -81,9 +82,9 @@ zmq::signaler_t *zmq::app_thread_t::get_signaler () bool zmq::app_thread_t::process_commands (bool block_, bool throttle_) { - uint64_t signals; + uint32_t signal; if (block_) - signals = signaler.poll (); + signal = signaler.poll (); else { #if defined ZMQ_DELAY_COMMANDS @@ -116,20 +117,14 @@ bool zmq::app_thread_t::process_commands (bool block_, bool throttle_) #endif // Check whether there are any commands pending for this thread. - signals = signaler.check (); + signal = signaler.check (); } - if (signals) { - - // Traverse all the possible sources of commands and process - // all the commands from all of them. - for (int i = 0; i != thread_slot_count (); i++) { - if (signals & (uint64_t (1) << i)) { - command_t cmd; - while (dispatcher->read (i, get_thread_slot (), &cmd)) - cmd.destination->process_command (cmd); - } - } + // Process all the commands from the signaling source if there is one. + if (signal != signaler_t::no_signal) { + command_t cmd; + while (dispatcher->read (signal, get_thread_slot (), &cmd)) + cmd.destination->process_command (cmd); } return !terminated; diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 2bca757..bca6947 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -34,7 +34,7 @@ namespace zmq { public: - app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); + app_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); ~app_thread_t (); diff --git a/src/config.hpp b/src/config.hpp index 12e29ca..99c9d86 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -37,6 +37,10 @@ namespace zmq // footprint of dispatcher. command_pipe_granularity = 4, + // Number of signals that can be read by the signaler + // using a single system call. + signal_buffer_size = 8, + // Determines how often does socket poll for new commands when it // still has unprocessed messages to handle. Thus, if it is set to 100, // socket will process 100 inbound messages before doing the poll. diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 1e11619..b1ba11f 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -33,7 +33,7 @@ #include "windows.h" #endif -zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : +zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) : sockets (0), terminated (false) { @@ -49,7 +49,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : #endif // Create application thread proxies. - for (int i = 0; i != app_threads_; i++) { + for (uint32_t i = 0; i != app_threads_; i++) { app_thread_info_t info; info.associated = false; info.app_thread = new (std::nothrow) app_thread_t (this, i); @@ -59,7 +59,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : } // Create I/O thread objects. - for (int i = 0; i != io_threads_; i++) { + for (uint32_t i = 0; i != io_threads_; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i + app_threads_); zmq_assert (io_thread); @@ -79,7 +79,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : zmq_assert (command_pipes); // Launch I/O threads. - for (int i = 0; i != io_threads_; i++) + for (uint32_t i = 0; i != io_threads_; i++) io_threads [i]->start (); } @@ -136,9 +136,9 @@ zmq::dispatcher_t::~dispatcher_t () #endif } -int zmq::dispatcher_t::thread_slot_count () +uint32_t zmq::dispatcher_t::thread_slot_count () { - return signalers.size (); + return (uint32_t) signalers.size (); } zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) @@ -213,7 +213,7 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_) app_threads_sync.unlock (); } -void zmq::dispatcher_t::write (int source_, int destination_, +void zmq::dispatcher_t::write (uint32_t source_, uint32_t destination_, const command_t &command_) { command_pipe_t &pipe = @@ -223,7 +223,7 @@ void zmq::dispatcher_t::write (int source_, int destination_, signalers [destination_]->signal (source_); } -bool zmq::dispatcher_t::read (int source_, int destination_, +bool zmq::dispatcher_t::read (uint32_t source_, uint32_t destination_, command_t *command_) { return command_pipes [source_ * signalers.size () + diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index ff08abc..0a1ed5c 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -51,7 +51,7 @@ namespace zmq // Create the dispatcher object. Matrix of pipes to communicate between // each socket and each I/O thread is created along with appropriate // signalers. - dispatcher_t (int app_threads_, int io_threads_); + dispatcher_t (uint32_t app_threads_, uint32_t io_threads_); // This function is called when user invokes zmq_term. If there are // no more sockets open it'll cause all the infrastructure to be shut @@ -72,14 +72,16 @@ namespace zmq // Returns number of thread slots in the dispatcher. To be used by // individual threads to find out how many distinct signals can be // received. - int thread_slot_count (); + uint32_t thread_slot_count (); // Send command from the source to the destination. - void write (int source_, int destination_, const command_t &command_); + void write (uint32_t source_, uint32_t destination_, + const command_t &command_); // Receive command from the source. Returns false if there is no // command available. - bool read (int source_, int destination_, command_t *command_); + bool read (uint32_t source_, uint32_t destination_, + command_t *command_); // Returns the I/O thread that is the least busy at the moment. // Taskset specifies which I/O threads are eligible (0 = all). diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 7d997ad..e9f9aa5 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -28,7 +28,8 @@ #include "command.hpp" #include "dispatcher.hpp" -zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, + uint32_t thread_slot_) : object_t (dispatcher_, thread_slot_) { poller = new (std::nothrow) poller_t; @@ -66,22 +67,17 @@ int zmq::io_thread_t::get_load () void zmq::io_thread_t::in_event () { - // Find out which threads are sending us commands. - uint64_t signals = signaler.check (); - zmq_assert (signals); - - // Iterate through all the threads in the process and find out - // which of them sent us commands. - int slot_count = thread_slot_count (); - for (int source_thread_slot = 0; - source_thread_slot != slot_count; source_thread_slot++) { - if (signals & (uint64_t (1) << source_thread_slot)) { - - // Read all the commands from particular thread. - command_t cmd; - while (dispatcher->read (source_thread_slot, thread_slot, &cmd)) - cmd.destination->process_command (cmd); - } + while (true) { + + // Get the next signal. + uint32_t signal = signaler.check (); + if (signal == signaler_t::no_signal) + break; + + // Process all the commands from the thread that sent the signal. + command_t cmd; + while (dispatcher->read (signal, thread_slot, &cmd)) + cmd.destination->process_command (cmd); } } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index deb03a1..7e105b3 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -38,7 +38,7 @@ namespace zmq { public: - io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); + io_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); // Clean-up. If the thread was started, it's neccessary to call 'stop' // before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/object.cpp b/src/object.cpp index 113a456..38eb1f8 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -28,7 +28,7 @@ #include "session.hpp" #include "socket_base.hpp" -zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::object_t::object_t (dispatcher_t *dispatcher_, uint32_t thread_slot_) : dispatcher (dispatcher_), thread_slot (thread_slot_) { @@ -44,12 +44,7 @@ zmq::object_t::~object_t () { } -int zmq::object_t::thread_slot_count () -{ - return dispatcher->thread_slot_count (); -} - -int zmq::object_t::get_thread_slot () +uint32_t zmq::object_t::get_thread_slot () { return thread_slot; } @@ -162,7 +157,7 @@ void zmq::object_t::send_stop () { // 'stop' command goes always from administrative thread to // the current object. - int admin_thread_id = dispatcher->thread_slot_count () - 1; + uint32_t admin_thread_id = dispatcher->thread_slot_count () - 1; command_t cmd; cmd.destination = this; cmd.type = command_t::stop; @@ -375,7 +370,7 @@ void zmq::object_t::process_seqnum () void zmq::object_t::send_command (command_t &cmd_) { - int destination_thread_slot = cmd_.destination->get_thread_slot (); + uint32_t destination_thread_slot = cmd_.destination->get_thread_slot (); dispatcher->write (thread_slot, destination_thread_slot, cmd_); } diff --git a/src/object.hpp b/src/object.hpp index f29342e..b29c6b8 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,11 +32,11 @@ namespace zmq { public: - object_t (class dispatcher_t *dispatcher_, int thread_slot_); + object_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); object_t (object_t *parent_); virtual ~object_t (); - int get_thread_slot (); + uint32_t get_thread_slot (); dispatcher_t *get_dispatcher (); void process_command (struct command_t &cmd_); @@ -52,9 +52,6 @@ namespace zmq void unregister_endpoints (class socket_base_t *socket_); class socket_base_t *find_endpoint (const char *addr_); - // Returns number of thead slots in the dispatcher. - int thread_slot_count (); - // Chooses least loaded I/O thread. class io_thread_t *choose_io_thread (uint64_t taskset_); @@ -106,7 +103,7 @@ namespace zmq class dispatcher_t *dispatcher; // Slot ID of the thread the object belongs to. - int thread_slot; + uint32_t thread_slot; private: diff --git a/src/signaler.cpp b/src/signaler.cpp index 1d3b32f..2d89eca 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -32,9 +32,60 @@ #include <fcntl.h> #endif +const uint32_t zmq::signaler_t::no_signal = 0xffffffff; + +uint32_t zmq::signaler_t::poll () +{ + // Return next signal. + if (current != count) { + uint32_t result = buffer [current]; + current++; + return result; + } + + // If there is no signal buffered, poll for new signals. + xpoll (); + + // Return first signal. + zmq_assert (current != count); + uint32_t result = buffer [current]; + current++; + return result; +} + +uint32_t zmq::signaler_t::check () +{ + // Return next signal. + if (current != count) { + uint32_t result = buffer [current]; + current++; + return result; + } + + // If there is no signal buffered, check whether more signals + // can be obtained. + xcheck (); + + // Return first signal if any. + if (current != count) { + uint32_t result = buffer [current]; + current++; + return result; + } + + return no_signal; +} + +zmq::fd_t zmq::signaler_t::get_fd () +{ + return r; +} + #if defined ZMQ_HAVE_WINDOWS -zmq::signaler_t::signaler_t () +zmq::signaler_t::signaler_t () : + current (0), + count (0) { // Windows have no 'socketpair' function. CreatePipe is no good as pipe // handles cannot be polled on. Here we create the socketpair by hand. @@ -95,18 +146,16 @@ zmq::signaler_t::~signaler_t () wsa_assert (rc != SOCKET_ERROR); } -void zmq::signaler_t::signal (int signal_) +void zmq::signaler_t::signal (uint32_t signal_) { // TODO: Note that send is a blocking operation. // How should we behave if the signal cannot be written to the signaler? - - zmq_assert (signal_ >= 0 && signal_ < 64); - char c = (char) signal_; - int rc = send (w, &c, 1, 0); + int rc = send (w, &signal_, sizeof (signal_), 0); win_assert (rc != SOCKET_ERROR); + zmq_assert (rc == sizeof (signal_)); } -uint64_t zmq::signaler_t::poll () +void zmq::signaler_t::xpoll () { // Switch to blocking mode. unsigned long argp = 0; @@ -115,8 +164,8 @@ uint64_t zmq::signaler_t::poll () // Get the signals. Given that we are in the blocking mode now, // there should be at least a single signal returned. - uint64_t signals = check (); - zmq_assert (signals); + xcheck (); + zmq_assert (current != count); // Switch back to non-blocking mode. argp = 1; @@ -126,25 +175,24 @@ uint64_t zmq::signaler_t::poll () return signals; } -uint64_t zmq::signaler_t::check () +void zmq::signaler_t::xcheck () { - unsigned char buffer [32]; - int nbytes = recv (r, (char*) buffer, 32, 0); - if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) - return 0; - wsa_assert (nbytes != -1); + int nbytes = recv (r, (char*) buffer, sizeof (buffer), 0); - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); + // No signals are available. + if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { + current = 0; + count = 0; + return; } - return signals; -} -zmq::fd_t zmq::signaler_t::get_fd () -{ - return r; + wsa_assert (nbytes != -1); + + // Check whether we haven't got half of a signal. + zmq_assert (nbytes % sizeof (uint32_t) == 0); + + current = 0; + count = nbytes / sizeof (uint32_t); } #elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX @@ -152,7 +200,9 @@ zmq::fd_t zmq::signaler_t::get_fd () #include <sys/types.h> #include <sys/socket.h> -zmq::signaler_t::signaler_t () +zmq::signaler_t::signaler_t () : + current (0), + count (0) { int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); @@ -174,15 +224,14 @@ zmq::signaler_t::~signaler_t () close (r); } -void zmq::signaler_t::signal (int signal_) +void zmq::signaler_t::signal (uint32_t signal_) { - zmq_assert (signal_ >= 0 && signal_ < 64); - unsigned char c = (unsigned char) signal_; - ssize_t nbytes = send (w, &c, 1, 0); - errno_assert (nbytes == 1); + ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0); + errno_assert (nbytes != -1); + zmq_assert (nbytes == sizeof (signal_); } -uint64_t zmq::signaler_t::poll () +void zmq::signaler_t::xpoll () { // Set the reader to blocking mode. int flags = fcntl (r, F_GETFL, 0); @@ -192,7 +241,8 @@ uint64_t zmq::signaler_t::poll () errno_assert (rc != -1); // Poll for events. - uint64_t signals = check (); + xcheck (); + zmq_assert (current != count); // Set the reader to non-blocking mode. flags = fcntl (r, F_GETFL, 0); @@ -200,29 +250,23 @@ uint64_t zmq::signaler_t::poll () flags = 0; rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); errno_assert (rc != -1); - - return signals; } -uint64_t zmq::signaler_t::check () +void zmq::signaler_t::xcheck () { - unsigned char buffer [64]; - ssize_t nbytes = recv (r, buffer, 64, 0); - if (nbytes == -1 && errno == EAGAIN) - return 0; + ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0); + if (nbytes == -1 && errno == EAGAIN) { + current = 0; + count = 0; + return; + } zmq_assert (nbytes != -1); - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos ++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); - } - return signals; -} + // Check whether we haven't got half of a signal. + zmq_assert (nbytes % sizeof (uint32_t) == 0); -zmq::fd_t zmq::signaler_t::get_fd () -{ - return r; + current = 0; + count = nbytes / sizeof (uint32_t); } #else @@ -230,7 +274,9 @@ zmq::fd_t zmq::signaler_t::get_fd () #include <sys/types.h> #include <sys/socket.h> -zmq::signaler_t::signaler_t () +zmq::signaler_t::signaler_t () : + current (0), + count (0) { int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); @@ -245,50 +291,42 @@ zmq::signaler_t::~signaler_t () close (r); } -void zmq::signaler_t::signal (int signal_) +void zmq::signaler_t::signal (uint32_t signal_) { // TODO: Note that send is a blocking operation. // How should we behave if the signal cannot be written to the signaler? - - zmq_assert (signal_ >= 0 && signal_ < 64); - unsigned char c = (unsigned char) signal_; - ssize_t nbytes = send (w, &c, 1, 0); - errno_assert (nbytes == 1); + ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0); + errno_assert (nbytes != -1); + zmq_assert (nbytes == sizeof (signal_)); } -uint64_t zmq::signaler_t::poll () +void zmq::signaler_t::xpoll () { - unsigned char buffer [64]; - ssize_t nbytes = recv (r, buffer, 64, 0); - zmq_assert (nbytes != -1); + ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0); + errno_assert (nbytes != -1); + + // Check whether we haven't got half of a signal. + zmq_assert (nbytes % sizeof (uint32_t) == 0); - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos ++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); - } - return signals; + current = 0; + count = nbytes / sizeof (uint32_t); } -uint64_t zmq::signaler_t::check () +void zmq::signaler_t::xcheck () { - unsigned char buffer [64]; ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT); - if (nbytes == -1 && errno == EAGAIN) - return 0; - zmq_assert (nbytes != -1); - - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos ++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); + if (nbytes == -1 && errno == EAGAIN) { + current = 0; + count = 0; + return; } - return signals; -} + errno_assert (nbytes != -1); -zmq::fd_t zmq::signaler_t::get_fd () -{ - return r; + // Check whether we haven't got half of a signal. + zmq_assert (nbytes % sizeof (uint32_t) == 0); + + current = 0; + count = nbytes / sizeof (uint32_t); } #endif diff --git a/src/signaler.hpp b/src/signaler.hpp index f239771..5509894 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -20,9 +20,12 @@ #ifndef __ZMQ_SIGNALER_HPP_INCLUDED__ #define __ZMQ_SIGNALER_HPP_INCLUDED__ +#include <stddef.h> + #include "platform.hpp" #include "fd.hpp" #include "stdint.hpp" +#include "config.hpp" namespace zmq { @@ -39,14 +42,18 @@ namespace zmq signaler_t (); ~signaler_t (); - // i_signaler interface implementation. - void signal (int signal_); - uint64_t poll (); - uint64_t check (); + static const uint32_t no_signal; + + void signal (uint32_t signal_); + uint32_t poll (); + uint32_t check (); fd_t get_fd (); private: + void xpoll (); + void xcheck (); + #if defined ZMQ_HAVE_OPENVMS // Whilst OpenVMS supports socketpair - it maps to AF_INET only. @@ -64,6 +71,15 @@ namespace zmq fd_t w; fd_t r; + // Signal buffer. + uint32_t buffer [signal_buffer_size]; + + // Position of the next signal in the buffer to return to the user. + size_t current; + + // Number of signals in the signal buffer. + size_t count; + // Disable copying of fd_signeler object. signaler_t (const signaler_t&); void operator = (const signaler_t&); diff --git a/src/zmq.cpp b/src/zmq.cpp index c05af25..d087d53 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -262,7 +262,7 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_) // Create 0MQ context. zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t ( - app_threads_, io_threads_); + (uint32_t) app_threads_, (uint32_t) io_threads_); zmq_assert (dispatcher); return (void*) dispatcher; } |