summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/app_thread.cpp25
-rw-r--r--src/app_thread.hpp2
-rw-r--r--src/config.hpp4
-rw-r--r--src/dispatcher.cpp16
-rw-r--r--src/dispatcher.hpp10
-rw-r--r--src/io_thread.cpp30
-rw-r--r--src/io_thread.hpp2
-rw-r--r--src/object.cpp13
-rw-r--r--src/object.hpp9
-rw-r--r--src/signaler.cpp200
-rw-r--r--src/signaler.hpp24
-rw-r--r--src/zmq.cpp2
12 files changed, 190 insertions, 147 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 10068c0..1350248 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -57,7 +57,8 @@
#define ZMQ_DELAY_COMMANDS
#endif
-zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_,
+ uint32_t thread_slot_) :
object_t (dispatcher_, thread_slot_),
last_processing_time (0),
terminated (false)
@@ -81,9 +82,9 @@ zmq::signaler_t *zmq::app_thread_t::get_signaler ()
bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
- uint64_t signals;
+ uint32_t signal;
if (block_)
- signals = signaler.poll ();
+ signal = signaler.poll ();
else {
#if defined ZMQ_DELAY_COMMANDS
@@ -116,20 +117,14 @@ bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
#endif
// Check whether there are any commands pending for this thread.
- signals = signaler.check ();
+ signal = signaler.check ();
}
- if (signals) {
-
- // Traverse all the possible sources of commands and process
- // all the commands from all of them.
- for (int i = 0; i != thread_slot_count (); i++) {
- if (signals & (uint64_t (1) << i)) {
- command_t cmd;
- while (dispatcher->read (i, get_thread_slot (), &cmd))
- cmd.destination->process_command (cmd);
- }
- }
+ // Process all the commands from the signaling source if there is one.
+ if (signal != signaler_t::no_signal) {
+ command_t cmd;
+ while (dispatcher->read (signal, get_thread_slot (), &cmd))
+ cmd.destination->process_command (cmd);
}
return !terminated;
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 2bca757..bca6947 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -34,7 +34,7 @@ namespace zmq
{
public:
- app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
+ app_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
~app_thread_t ();
diff --git a/src/config.hpp b/src/config.hpp
index 12e29ca..99c9d86 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -37,6 +37,10 @@ namespace zmq
// footprint of dispatcher.
command_pipe_granularity = 4,
+ // Number of signals that can be read by the signaler
+ // using a single system call.
+ signal_buffer_size = 8,
+
// Determines how often does socket poll for new commands when it
// still has unprocessed messages to handle. Thus, if it is set to 100,
// socket will process 100 inbound messages before doing the poll.
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 1e11619..b1ba11f 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -33,7 +33,7 @@
#include "windows.h"
#endif
-zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
+zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
sockets (0),
terminated (false)
{
@@ -49,7 +49,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
#endif
// Create application thread proxies.
- for (int i = 0; i != app_threads_; i++) {
+ for (uint32_t i = 0; i != app_threads_; i++) {
app_thread_info_t info;
info.associated = false;
info.app_thread = new (std::nothrow) app_thread_t (this, i);
@@ -59,7 +59,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
}
// Create I/O thread objects.
- for (int i = 0; i != io_threads_; i++) {
+ for (uint32_t i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this,
i + app_threads_);
zmq_assert (io_thread);
@@ -79,7 +79,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
zmq_assert (command_pipes);
// Launch I/O threads.
- for (int i = 0; i != io_threads_; i++)
+ for (uint32_t i = 0; i != io_threads_; i++)
io_threads [i]->start ();
}
@@ -136,9 +136,9 @@ zmq::dispatcher_t::~dispatcher_t ()
#endif
}
-int zmq::dispatcher_t::thread_slot_count ()
+uint32_t zmq::dispatcher_t::thread_slot_count ()
{
- return signalers.size ();
+ return (uint32_t) signalers.size ();
}
zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
@@ -213,7 +213,7 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
app_threads_sync.unlock ();
}
-void zmq::dispatcher_t::write (int source_, int destination_,
+void zmq::dispatcher_t::write (uint32_t source_, uint32_t destination_,
const command_t &command_)
{
command_pipe_t &pipe =
@@ -223,7 +223,7 @@ void zmq::dispatcher_t::write (int source_, int destination_,
signalers [destination_]->signal (source_);
}
-bool zmq::dispatcher_t::read (int source_, int destination_,
+bool zmq::dispatcher_t::read (uint32_t source_, uint32_t destination_,
command_t *command_)
{
return command_pipes [source_ * signalers.size () +
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index ff08abc..0a1ed5c 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -51,7 +51,7 @@ namespace zmq
// Create the dispatcher object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate
// signalers.
- dispatcher_t (int app_threads_, int io_threads_);
+ dispatcher_t (uint32_t app_threads_, uint32_t io_threads_);
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
@@ -72,14 +72,16 @@ namespace zmq
// Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be
// received.
- int thread_slot_count ();
+ uint32_t thread_slot_count ();
// Send command from the source to the destination.
- void write (int source_, int destination_, const command_t &command_);
+ void write (uint32_t source_, uint32_t destination_,
+ const command_t &command_);
// Receive command from the source. Returns false if there is no
// command available.
- bool read (int source_, int destination_, command_t *command_);
+ bool read (uint32_t source_, uint32_t destination_,
+ command_t *command_);
// Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all).
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 7d997ad..e9f9aa5 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -28,7 +28,8 @@
#include "command.hpp"
#include "dispatcher.hpp"
-zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_,
+ uint32_t thread_slot_) :
object_t (dispatcher_, thread_slot_)
{
poller = new (std::nothrow) poller_t;
@@ -66,22 +67,17 @@ int zmq::io_thread_t::get_load ()
void zmq::io_thread_t::in_event ()
{
- // Find out which threads are sending us commands.
- uint64_t signals = signaler.check ();
- zmq_assert (signals);
-
- // Iterate through all the threads in the process and find out
- // which of them sent us commands.
- int slot_count = thread_slot_count ();
- for (int source_thread_slot = 0;
- source_thread_slot != slot_count; source_thread_slot++) {
- if (signals & (uint64_t (1) << source_thread_slot)) {
-
- // Read all the commands from particular thread.
- command_t cmd;
- while (dispatcher->read (source_thread_slot, thread_slot, &cmd))
- cmd.destination->process_command (cmd);
- }
+ while (true) {
+
+ // Get the next signal.
+ uint32_t signal = signaler.check ();
+ if (signal == signaler_t::no_signal)
+ break;
+
+ // Process all the commands from the thread that sent the signal.
+ command_t cmd;
+ while (dispatcher->read (signal, thread_slot, &cmd))
+ cmd.destination->process_command (cmd);
}
}
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index deb03a1..7e105b3 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -38,7 +38,7 @@ namespace zmq
{
public:
- io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
+ io_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
diff --git a/src/object.cpp b/src/object.cpp
index 113a456..38eb1f8 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -28,7 +28,7 @@
#include "session.hpp"
#include "socket_base.hpp"
-zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
+zmq::object_t::object_t (dispatcher_t *dispatcher_, uint32_t thread_slot_) :
dispatcher (dispatcher_),
thread_slot (thread_slot_)
{
@@ -44,12 +44,7 @@ zmq::object_t::~object_t ()
{
}
-int zmq::object_t::thread_slot_count ()
-{
- return dispatcher->thread_slot_count ();
-}
-
-int zmq::object_t::get_thread_slot ()
+uint32_t zmq::object_t::get_thread_slot ()
{
return thread_slot;
}
@@ -162,7 +157,7 @@ void zmq::object_t::send_stop ()
{
// 'stop' command goes always from administrative thread to
// the current object.
- int admin_thread_id = dispatcher->thread_slot_count () - 1;
+ uint32_t admin_thread_id = dispatcher->thread_slot_count () - 1;
command_t cmd;
cmd.destination = this;
cmd.type = command_t::stop;
@@ -375,7 +370,7 @@ void zmq::object_t::process_seqnum ()
void zmq::object_t::send_command (command_t &cmd_)
{
- int destination_thread_slot = cmd_.destination->get_thread_slot ();
+ uint32_t destination_thread_slot = cmd_.destination->get_thread_slot ();
dispatcher->write (thread_slot, destination_thread_slot, cmd_);
}
diff --git a/src/object.hpp b/src/object.hpp
index f29342e..b29c6b8 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -32,11 +32,11 @@ namespace zmq
{
public:
- object_t (class dispatcher_t *dispatcher_, int thread_slot_);
+ object_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
object_t (object_t *parent_);
virtual ~object_t ();
- int get_thread_slot ();
+ uint32_t get_thread_slot ();
dispatcher_t *get_dispatcher ();
void process_command (struct command_t &cmd_);
@@ -52,9 +52,6 @@ namespace zmq
void unregister_endpoints (class socket_base_t *socket_);
class socket_base_t *find_endpoint (const char *addr_);
- // Returns number of thead slots in the dispatcher.
- int thread_slot_count ();
-
// Chooses least loaded I/O thread.
class io_thread_t *choose_io_thread (uint64_t taskset_);
@@ -106,7 +103,7 @@ namespace zmq
class dispatcher_t *dispatcher;
// Slot ID of the thread the object belongs to.
- int thread_slot;
+ uint32_t thread_slot;
private:
diff --git a/src/signaler.cpp b/src/signaler.cpp
index 1d3b32f..2d89eca 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -32,9 +32,60 @@
#include <fcntl.h>
#endif
+const uint32_t zmq::signaler_t::no_signal = 0xffffffff;
+
+uint32_t zmq::signaler_t::poll ()
+{
+ // Return next signal.
+ if (current != count) {
+ uint32_t result = buffer [current];
+ current++;
+ return result;
+ }
+
+ // If there is no signal buffered, poll for new signals.
+ xpoll ();
+
+ // Return first signal.
+ zmq_assert (current != count);
+ uint32_t result = buffer [current];
+ current++;
+ return result;
+}
+
+uint32_t zmq::signaler_t::check ()
+{
+ // Return next signal.
+ if (current != count) {
+ uint32_t result = buffer [current];
+ current++;
+ return result;
+ }
+
+ // If there is no signal buffered, check whether more signals
+ // can be obtained.
+ xcheck ();
+
+ // Return first signal if any.
+ if (current != count) {
+ uint32_t result = buffer [current];
+ current++;
+ return result;
+ }
+
+ return no_signal;
+}
+
+zmq::fd_t zmq::signaler_t::get_fd ()
+{
+ return r;
+}
+
#if defined ZMQ_HAVE_WINDOWS
-zmq::signaler_t::signaler_t ()
+zmq::signaler_t::signaler_t () :
+ current (0),
+ count (0)
{
// Windows have no 'socketpair' function. CreatePipe is no good as pipe
// handles cannot be polled on. Here we create the socketpair by hand.
@@ -95,18 +146,16 @@ zmq::signaler_t::~signaler_t ()
wsa_assert (rc != SOCKET_ERROR);
}
-void zmq::signaler_t::signal (int signal_)
+void zmq::signaler_t::signal (uint32_t signal_)
{
// TODO: Note that send is a blocking operation.
// How should we behave if the signal cannot be written to the signaler?
-
- zmq_assert (signal_ >= 0 && signal_ < 64);
- char c = (char) signal_;
- int rc = send (w, &c, 1, 0);
+ int rc = send (w, &signal_, sizeof (signal_), 0);
win_assert (rc != SOCKET_ERROR);
+ zmq_assert (rc == sizeof (signal_));
}
-uint64_t zmq::signaler_t::poll ()
+void zmq::signaler_t::xpoll ()
{
// Switch to blocking mode.
unsigned long argp = 0;
@@ -115,8 +164,8 @@ uint64_t zmq::signaler_t::poll ()
// Get the signals. Given that we are in the blocking mode now,
// there should be at least a single signal returned.
- uint64_t signals = check ();
- zmq_assert (signals);
+ xcheck ();
+ zmq_assert (current != count);
// Switch back to non-blocking mode.
argp = 1;
@@ -126,25 +175,24 @@ uint64_t zmq::signaler_t::poll ()
return signals;
}
-uint64_t zmq::signaler_t::check ()
+void zmq::signaler_t::xcheck ()
{
- unsigned char buffer [32];
- int nbytes = recv (r, (char*) buffer, 32, 0);
- if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
- return 0;
- wsa_assert (nbytes != -1);
+ int nbytes = recv (r, (char*) buffer, sizeof (buffer), 0);
- uint64_t signals = 0;
- for (int pos = 0; pos != nbytes; pos++) {
- zmq_assert (buffer [pos] < 64);
- signals |= (uint64_t (1) << (buffer [pos]));
+ // No signals are available.
+ if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
+ current = 0;
+ count = 0;
+ return;
}
- return signals;
-}
-zmq::fd_t zmq::signaler_t::get_fd ()
-{
- return r;
+ wsa_assert (nbytes != -1);
+
+ // Check whether we haven't got half of a signal.
+ zmq_assert (nbytes % sizeof (uint32_t) == 0);
+
+ current = 0;
+ count = nbytes / sizeof (uint32_t);
}
#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX
@@ -152,7 +200,9 @@ zmq::fd_t zmq::signaler_t::get_fd ()
#include <sys/types.h>
#include <sys/socket.h>
-zmq::signaler_t::signaler_t ()
+zmq::signaler_t::signaler_t () :
+ current (0),
+ count (0)
{
int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
@@ -174,15 +224,14 @@ zmq::signaler_t::~signaler_t ()
close (r);
}
-void zmq::signaler_t::signal (int signal_)
+void zmq::signaler_t::signal (uint32_t signal_)
{
- zmq_assert (signal_ >= 0 && signal_ < 64);
- unsigned char c = (unsigned char) signal_;
- ssize_t nbytes = send (w, &c, 1, 0);
- errno_assert (nbytes == 1);
+ ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0);
+ errno_assert (nbytes != -1);
+ zmq_assert (nbytes == sizeof (signal_);
}
-uint64_t zmq::signaler_t::poll ()
+void zmq::signaler_t::xpoll ()
{
// Set the reader to blocking mode.
int flags = fcntl (r, F_GETFL, 0);
@@ -192,7 +241,8 @@ uint64_t zmq::signaler_t::poll ()
errno_assert (rc != -1);
// Poll for events.
- uint64_t signals = check ();
+ xcheck ();
+ zmq_assert (current != count);
// Set the reader to non-blocking mode.
flags = fcntl (r, F_GETFL, 0);
@@ -200,29 +250,23 @@ uint64_t zmq::signaler_t::poll ()
flags = 0;
rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
-
- return signals;
}
-uint64_t zmq::signaler_t::check ()
+void zmq::signaler_t::xcheck ()
{
- unsigned char buffer [64];
- ssize_t nbytes = recv (r, buffer, 64, 0);
- if (nbytes == -1 && errno == EAGAIN)
- return 0;
+ ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0);
+ if (nbytes == -1 && errno == EAGAIN) {
+ current = 0;
+ count = 0;
+ return;
+ }
zmq_assert (nbytes != -1);
- uint64_t signals = 0;
- for (int pos = 0; pos != nbytes; pos ++) {
- zmq_assert (buffer [pos] < 64);
- signals |= (uint64_t (1) << (buffer [pos]));
- }
- return signals;
-}
+ // Check whether we haven't got half of a signal.
+ zmq_assert (nbytes % sizeof (uint32_t) == 0);
-zmq::fd_t zmq::signaler_t::get_fd ()
-{
- return r;
+ current = 0;
+ count = nbytes / sizeof (uint32_t);
}
#else
@@ -230,7 +274,9 @@ zmq::fd_t zmq::signaler_t::get_fd ()
#include <sys/types.h>
#include <sys/socket.h>
-zmq::signaler_t::signaler_t ()
+zmq::signaler_t::signaler_t () :
+ current (0),
+ count (0)
{
int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
@@ -245,50 +291,42 @@ zmq::signaler_t::~signaler_t ()
close (r);
}
-void zmq::signaler_t::signal (int signal_)
+void zmq::signaler_t::signal (uint32_t signal_)
{
// TODO: Note that send is a blocking operation.
// How should we behave if the signal cannot be written to the signaler?
-
- zmq_assert (signal_ >= 0 && signal_ < 64);
- unsigned char c = (unsigned char) signal_;
- ssize_t nbytes = send (w, &c, 1, 0);
- errno_assert (nbytes == 1);
+ ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0);
+ errno_assert (nbytes != -1);
+ zmq_assert (nbytes == sizeof (signal_));
}
-uint64_t zmq::signaler_t::poll ()
+void zmq::signaler_t::xpoll ()
{
- unsigned char buffer [64];
- ssize_t nbytes = recv (r, buffer, 64, 0);
- zmq_assert (nbytes != -1);
+ ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0);
+ errno_assert (nbytes != -1);
+
+ // Check whether we haven't got half of a signal.
+ zmq_assert (nbytes % sizeof (uint32_t) == 0);
- uint64_t signals = 0;
- for (int pos = 0; pos != nbytes; pos ++) {
- zmq_assert (buffer [pos] < 64);
- signals |= (uint64_t (1) << (buffer [pos]));
- }
- return signals;
+ current = 0;
+ count = nbytes / sizeof (uint32_t);
}
-uint64_t zmq::signaler_t::check ()
+void zmq::signaler_t::xcheck ()
{
- unsigned char buffer [64];
ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT);
- if (nbytes == -1 && errno == EAGAIN)
- return 0;
- zmq_assert (nbytes != -1);
-
- uint64_t signals = 0;
- for (int pos = 0; pos != nbytes; pos ++) {
- zmq_assert (buffer [pos] < 64);
- signals |= (uint64_t (1) << (buffer [pos]));
+ if (nbytes == -1 && errno == EAGAIN) {
+ current = 0;
+ count = 0;
+ return;
}
- return signals;
-}
+ errno_assert (nbytes != -1);
-zmq::fd_t zmq::signaler_t::get_fd ()
-{
- return r;
+ // Check whether we haven't got half of a signal.
+ zmq_assert (nbytes % sizeof (uint32_t) == 0);
+
+ current = 0;
+ count = nbytes / sizeof (uint32_t);
}
#endif
diff --git a/src/signaler.hpp b/src/signaler.hpp
index f239771..5509894 100644
--- a/src/signaler.hpp
+++ b/src/signaler.hpp
@@ -20,9 +20,12 @@
#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
#define __ZMQ_SIGNALER_HPP_INCLUDED__
+#include <stddef.h>
+
#include "platform.hpp"
#include "fd.hpp"
#include "stdint.hpp"
+#include "config.hpp"
namespace zmq
{
@@ -39,14 +42,18 @@ namespace zmq
signaler_t ();
~signaler_t ();
- // i_signaler interface implementation.
- void signal (int signal_);
- uint64_t poll ();
- uint64_t check ();
+ static const uint32_t no_signal;
+
+ void signal (uint32_t signal_);
+ uint32_t poll ();
+ uint32_t check ();
fd_t get_fd ();
private:
+ void xpoll ();
+ void xcheck ();
+
#if defined ZMQ_HAVE_OPENVMS
// Whilst OpenVMS supports socketpair - it maps to AF_INET only.
@@ -64,6 +71,15 @@ namespace zmq
fd_t w;
fd_t r;
+ // Signal buffer.
+ uint32_t buffer [signal_buffer_size];
+
+ // Position of the next signal in the buffer to return to the user.
+ size_t current;
+
+ // Number of signals in the signal buffer.
+ size_t count;
+
// Disable copying of fd_signeler object.
signaler_t (const signaler_t&);
void operator = (const signaler_t&);
diff --git a/src/zmq.cpp b/src/zmq.cpp
index c05af25..d087d53 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -262,7 +262,7 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_)
// Create 0MQ context.
zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
- app_threads_, io_threads_);
+ (uint32_t) app_threads_, (uint32_t) io_threads_);
zmq_assert (dispatcher);
return (void*) dispatcher;
}