diff options
-rw-r--r-- | src/config.hpp | 4 | ||||
-rw-r--r-- | src/dispatcher.cpp | 53 | ||||
-rw-r--r-- | src/dispatcher.hpp | 15 | ||||
-rw-r--r-- | src/zmq.cpp | 6 |
4 files changed, 41 insertions, 37 deletions
diff --git a/src/config.hpp b/src/config.hpp index 5b51a08..e211f34 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -27,6 +27,10 @@ namespace zmq enum { + // Maximal number of OS threads that can own 0MQ sockets + // at the same time. + max_app_threads = 512, + // Number of new messages in message pipe needed to trigger new memory // allocation. Setting this parameter to 256 decreases the impact of // memory allocation by approximately 99.6% diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index db06d32..2ae99ba 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -34,7 +34,7 @@ #include "windows.h" #endif -zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) : +zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) : sockets (0), terminated (false) { @@ -50,33 +50,19 @@ zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) : #endif // Initialise the array of signalers. - signalers_count = app_threads_ + io_threads_; + signalers_count = max_app_threads + io_threads_; signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count); zmq_assert (signalers); memset (signalers, 0, sizeof (signaler_t*) * signalers_count); - // Create I/O thread objects. + // Create I/O thread objects and launch them. for (uint32_t i = 0; i != io_threads_; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); zmq_assert (io_thread); io_threads.push_back (io_thread); signalers [i] = io_thread->get_signaler (); + io_thread->start (); } - - // Create application thread proxies. - for (uint32_t i = 0; i != app_threads_; i++) { - app_thread_info_t info; - info.associated = false; - info.app_thread = new (std::nothrow) app_thread_t (this, - i + io_threads_); - zmq_assert (info.app_thread); - app_threads.push_back (info); - signalers [i + io_threads_] = info.app_thread->get_signaler (); - } - - // Launch I/O threads. - for (uint32_t i = 0; i != io_threads_; i++) - io_threads [i]->start (); } int zmq::dispatcher_t::term () @@ -152,14 +138,35 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) // If no app_thread_t is associated with the calling thread, // associate it with one of the unused app_thread_t objects. if (current == app_threads.size ()) { + + // If all the existing app_threads are already used, create one more. if (unused == app_threads.size ()) { - app_threads_sync.unlock (); - errno = EMTHREAD; - return NULL; + + // If max_app_threads limit was reached, return error. + if (app_threads.size () == max_app_threads) { + app_threads_sync.unlock (); + errno = EMTHREAD; + return NULL; + } + + // Create the new application thread proxy object. + app_thread_info_t info; + info.associated = false; + info.app_thread = new (std::nothrow) app_thread_t (this, + io_threads.size () + app_threads.size ()); + zmq_assert (info.app_thread); + signalers [io_threads.size () + app_threads.size ()] = + info.app_thread->get_signaler (); + app_threads.push_back (info); } - app_threads [unused].associated = true; - app_threads [unused].tid = thread_t::id (); + + // Incidentally, this works both when there is an unused app_thread + // and when a new one is created. current = unused; + + // Associate the selected app_thread with the OS thread. + app_threads [current].associated = true; + app_threads [current].tid = thread_t::id (); } app_thread_t *thread = app_threads [current].app_thread; diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 0986e8a..cad4844 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -34,23 +34,14 @@ namespace zmq { - - // Dispatcher implements bidirectional thread-safe passing of commands - // between N threads. It consists of a ypipes to pass commands and - // signalers to wake up the receiver thread when new commands are - // available. Note that dispatcher is inefficient for passing messages - // within a thread (sender thread = receiver thread). The optimisation is - // not part of the class and should be implemented by individual threads - // (presumably by calling the command handling function directly). class dispatcher_t { public: - // Create the dispatcher object. Matrix of pipes to communicate between - // each socket and each I/O thread is created along with appropriate - // signalers. - dispatcher_t (uint32_t app_threads_, uint32_t io_threads_); + // Create the dispatcher object. The argument specifies the size + // of I/O thread pool to create. + dispatcher_t (uint32_t io_threads_); // This function is called when user invokes zmq_term. If there are // no more sockets open it'll cause all the infrastructure to be shut diff --git a/src/zmq.cpp b/src/zmq.cpp index d087d53..e97cb64 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -226,7 +226,9 @@ size_t zmq_msg_size (zmq_msg_t *msg_) return ((zmq::msg_content_t*) msg_->content)->size; } -void *zmq_init (int app_threads_, int io_threads_, int flags_) +// TODO: app_threads and flags parameters are not used anymore... +// Reflect this in the API/ABI. +void *zmq_init (int /*app_threads_*/, int io_threads_, int /*flags_*/) { // There are no context flags defined at the moment, so flags_ is ignored. @@ -262,7 +264,7 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_) // Create 0MQ context. zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t ( - (uint32_t) app_threads_, (uint32_t) io_threads_); + (uint32_t) io_threads_); zmq_assert (dispatcher); return (void*) dispatcher; } |