summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/config.hpp4
-rw-r--r--src/dispatcher.cpp53
-rw-r--r--src/dispatcher.hpp15
-rw-r--r--src/zmq.cpp6
4 files changed, 41 insertions, 37 deletions
diff --git a/src/config.hpp b/src/config.hpp
index 5b51a08..e211f34 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -27,6 +27,10 @@ namespace zmq
enum
{
+ // Maximal number of OS threads that can own 0MQ sockets
+ // at the same time.
+ max_app_threads = 512,
+
// Number of new messages in message pipe needed to trigger new memory
// allocation. Setting this parameter to 256 decreases the impact of
// memory allocation by approximately 99.6%
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index db06d32..2ae99ba 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -34,7 +34,7 @@
#include "windows.h"
#endif
-zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
+zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) :
sockets (0),
terminated (false)
{
@@ -50,33 +50,19 @@ zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
#endif
// Initialise the array of signalers.
- signalers_count = app_threads_ + io_threads_;
+ signalers_count = max_app_threads + io_threads_;
signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count);
zmq_assert (signalers);
memset (signalers, 0, sizeof (signaler_t*) * signalers_count);
- // Create I/O thread objects.
+ // Create I/O thread objects and launch them.
for (uint32_t i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
signalers [i] = io_thread->get_signaler ();
+ io_thread->start ();
}
-
- // Create application thread proxies.
- for (uint32_t i = 0; i != app_threads_; i++) {
- app_thread_info_t info;
- info.associated = false;
- info.app_thread = new (std::nothrow) app_thread_t (this,
- i + io_threads_);
- zmq_assert (info.app_thread);
- app_threads.push_back (info);
- signalers [i + io_threads_] = info.app_thread->get_signaler ();
- }
-
- // Launch I/O threads.
- for (uint32_t i = 0; i != io_threads_; i++)
- io_threads [i]->start ();
}
int zmq::dispatcher_t::term ()
@@ -152,14 +138,35 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
// If no app_thread_t is associated with the calling thread,
// associate it with one of the unused app_thread_t objects.
if (current == app_threads.size ()) {
+
+ // If all the existing app_threads are already used, create one more.
if (unused == app_threads.size ()) {
- app_threads_sync.unlock ();
- errno = EMTHREAD;
- return NULL;
+
+ // If max_app_threads limit was reached, return error.
+ if (app_threads.size () == max_app_threads) {
+ app_threads_sync.unlock ();
+ errno = EMTHREAD;
+ return NULL;
+ }
+
+ // Create the new application thread proxy object.
+ app_thread_info_t info;
+ info.associated = false;
+ info.app_thread = new (std::nothrow) app_thread_t (this,
+ io_threads.size () + app_threads.size ());
+ zmq_assert (info.app_thread);
+ signalers [io_threads.size () + app_threads.size ()] =
+ info.app_thread->get_signaler ();
+ app_threads.push_back (info);
}
- app_threads [unused].associated = true;
- app_threads [unused].tid = thread_t::id ();
+
+ // Incidentally, this works both when there is an unused app_thread
+ // and when a new one is created.
current = unused;
+
+ // Associate the selected app_thread with the OS thread.
+ app_threads [current].associated = true;
+ app_threads [current].tid = thread_t::id ();
}
app_thread_t *thread = app_threads [current].app_thread;
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index 0986e8a..cad4844 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -34,23 +34,14 @@
namespace zmq
{
-
- // Dispatcher implements bidirectional thread-safe passing of commands
- // between N threads. It consists of a ypipes to pass commands and
- // signalers to wake up the receiver thread when new commands are
- // available. Note that dispatcher is inefficient for passing messages
- // within a thread (sender thread = receiver thread). The optimisation is
- // not part of the class and should be implemented by individual threads
- // (presumably by calling the command handling function directly).
class dispatcher_t
{
public:
- // Create the dispatcher object. Matrix of pipes to communicate between
- // each socket and each I/O thread is created along with appropriate
- // signalers.
- dispatcher_t (uint32_t app_threads_, uint32_t io_threads_);
+ // Create the dispatcher object. The argument specifies the size
+ // of I/O thread pool to create.
+ dispatcher_t (uint32_t io_threads_);
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
diff --git a/src/zmq.cpp b/src/zmq.cpp
index d087d53..e97cb64 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -226,7 +226,9 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
return ((zmq::msg_content_t*) msg_->content)->size;
}
-void *zmq_init (int app_threads_, int io_threads_, int flags_)
+// TODO: app_threads and flags parameters are not used anymore...
+// Reflect this in the API/ABI.
+void *zmq_init (int /*app_threads_*/, int io_threads_, int /*flags_*/)
{
// There are no context flags defined at the moment, so flags_ is ignored.
@@ -262,7 +264,7 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_)
// Create 0MQ context.
zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
- (uint32_t) app_threads_, (uint32_t) io_threads_);
+ (uint32_t) io_threads_);
zmq_assert (dispatcher);
return (void*) dispatcher;
}