summaryrefslogtreecommitdiff
path: root/src/dispatcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dispatcher.cpp')
-rw-r--r--src/dispatcher.cpp53
1 files changed, 30 insertions, 23 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index db06d32..2ae99ba 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -34,7 +34,7 @@
#include "windows.h"
#endif
-zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
+zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) :
sockets (0),
terminated (false)
{
@@ -50,33 +50,19 @@ zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
#endif
// Initialise the array of signalers.
- signalers_count = app_threads_ + io_threads_;
+ signalers_count = max_app_threads + io_threads_;
signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count);
zmq_assert (signalers);
memset (signalers, 0, sizeof (signaler_t*) * signalers_count);
- // Create I/O thread objects.
+ // Create I/O thread objects and launch them.
for (uint32_t i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
signalers [i] = io_thread->get_signaler ();
+ io_thread->start ();
}
-
- // Create application thread proxies.
- for (uint32_t i = 0; i != app_threads_; i++) {
- app_thread_info_t info;
- info.associated = false;
- info.app_thread = new (std::nothrow) app_thread_t (this,
- i + io_threads_);
- zmq_assert (info.app_thread);
- app_threads.push_back (info);
- signalers [i + io_threads_] = info.app_thread->get_signaler ();
- }
-
- // Launch I/O threads.
- for (uint32_t i = 0; i != io_threads_; i++)
- io_threads [i]->start ();
}
int zmq::dispatcher_t::term ()
@@ -152,14 +138,35 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
// If no app_thread_t is associated with the calling thread,
// associate it with one of the unused app_thread_t objects.
if (current == app_threads.size ()) {
+
+ // If all the existing app_threads are already used, create one more.
if (unused == app_threads.size ()) {
- app_threads_sync.unlock ();
- errno = EMTHREAD;
- return NULL;
+
+ // If max_app_threads limit was reached, return error.
+ if (app_threads.size () == max_app_threads) {
+ app_threads_sync.unlock ();
+ errno = EMTHREAD;
+ return NULL;
+ }
+
+ // Create the new application thread proxy object.
+ app_thread_info_t info;
+ info.associated = false;
+ info.app_thread = new (std::nothrow) app_thread_t (this,
+ io_threads.size () + app_threads.size ());
+ zmq_assert (info.app_thread);
+ signalers [io_threads.size () + app_threads.size ()] =
+ info.app_thread->get_signaler ();
+ app_threads.push_back (info);
}
- app_threads [unused].associated = true;
- app_threads [unused].tid = thread_t::id ();
+
+ // Incidentally, this works both when there is an unused app_thread
+ // and when a new one is created.
current = unused;
+
+ // Associate the selected app_thread with the OS thread.
+ app_threads [current].associated = true;
+ app_threads [current].tid = thread_t::id ();
}
app_thread_t *thread = app_threads [current].app_thread;