diff options
Diffstat (limited to 'src/dispatcher.cpp')
-rw-r--r-- | src/dispatcher.cpp | 53 |
1 files changed, 30 insertions, 23 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index db06d32..2ae99ba 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -34,7 +34,7 @@ #include "windows.h" #endif -zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) : +zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) : sockets (0), terminated (false) { @@ -50,33 +50,19 @@ zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) : #endif // Initialise the array of signalers. - signalers_count = app_threads_ + io_threads_; + signalers_count = max_app_threads + io_threads_; signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count); zmq_assert (signalers); memset (signalers, 0, sizeof (signaler_t*) * signalers_count); - // Create I/O thread objects. + // Create I/O thread objects and launch them. for (uint32_t i = 0; i != io_threads_; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); zmq_assert (io_thread); io_threads.push_back (io_thread); signalers [i] = io_thread->get_signaler (); + io_thread->start (); } - - // Create application thread proxies. - for (uint32_t i = 0; i != app_threads_; i++) { - app_thread_info_t info; - info.associated = false; - info.app_thread = new (std::nothrow) app_thread_t (this, - i + io_threads_); - zmq_assert (info.app_thread); - app_threads.push_back (info); - signalers [i + io_threads_] = info.app_thread->get_signaler (); - } - - // Launch I/O threads. - for (uint32_t i = 0; i != io_threads_; i++) - io_threads [i]->start (); } int zmq::dispatcher_t::term () @@ -152,14 +138,35 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) // If no app_thread_t is associated with the calling thread, // associate it with one of the unused app_thread_t objects. if (current == app_threads.size ()) { + + // If all the existing app_threads are already used, create one more. if (unused == app_threads.size ()) { - app_threads_sync.unlock (); - errno = EMTHREAD; - return NULL; + + // If max_app_threads limit was reached, return error. + if (app_threads.size () == max_app_threads) { + app_threads_sync.unlock (); + errno = EMTHREAD; + return NULL; + } + + // Create the new application thread proxy object. + app_thread_info_t info; + info.associated = false; + info.app_thread = new (std::nothrow) app_thread_t (this, + io_threads.size () + app_threads.size ()); + zmq_assert (info.app_thread); + signalers [io_threads.size () + app_threads.size ()] = + info.app_thread->get_signaler (); + app_threads.push_back (info); } - app_threads [unused].associated = true; - app_threads [unused].tid = thread_t::id (); + + // Incidentally, this works both when there is an unused app_thread + // and when a new one is created. current = unused; + + // Associate the selected app_thread with the OS thread. + app_threads [current].associated = true; + app_threads [current].tid = thread_t::id (); } app_thread_t *thread = app_threads [current].app_thread; |