diff options
-rw-r--r-- | src/app_thread.cpp | 23 | ||||
-rw-r--r-- | src/app_thread.hpp | 19 | ||||
-rw-r--r-- | src/dispatcher.cpp | 79 | ||||
-rw-r--r-- | src/dispatcher.hpp | 29 |
4 files changed, 76 insertions, 74 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp index c55eb75..eff0cf7 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -62,7 +62,6 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, int flags_) : object_t (dispatcher_, thread_slot_), - associated (false), last_processing_time (0) { if (flags_ & ZMQ_POLL) { @@ -87,24 +86,6 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler () return signaler; } -bool zmq::app_thread_t::is_current () -{ - return !sockets.empty () && associated && - thread_t::equal (tid, thread_t::id ()); -} - -bool zmq::app_thread_t::make_current () -{ - // If there are object managed by this slot we cannot assign the slot - // to a different thread. - if (!sockets.empty ()) - return false; - - associated = true; - tid = thread_t::id (); - return true; -} - void zmq::app_thread_t::process_commands (bool block_, bool throttle_) { uint64_t signals; @@ -191,6 +172,8 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) s = new (std::nothrow) downstream_t (this); break; default: + if (sockets.empty ()) + dispatcher->no_sockets (this); errno = EINVAL; return NULL; } @@ -204,4 +187,6 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) void zmq::app_thread_t::remove_socket (socket_base_t *socket_) { sockets.erase (socket_); + if (sockets.empty ()) + dispatcher->no_sockets (this); } diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 0e2c3e1..9bd5641 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -25,7 +25,6 @@ #include "stdint.hpp" #include "object.hpp" #include "yarray.hpp" -#include "thread.hpp" namespace zmq { @@ -42,17 +41,6 @@ namespace zmq // Returns signaler associated with this application thread. struct i_signaler *get_signaler (); - // Nota bene: Following two functions are accessed from different - // threads. The caller (dispatcher) is responsible for synchronisation - // of accesses. - - // Returns true is current thread is associated with the app thread. - bool is_current (); - - // Tries to associate current thread with the app thread object. - // Returns true is successfull, false otherwise. - bool make_current (); - // Processes commands sent to this thread (if any). If 'block' is // set to true, returns only after at least one command was processed. // If throttle argument is true, commands are processed at most once @@ -71,13 +59,6 @@ namespace zmq typedef yarray_t <socket_base_t> sockets_t; sockets_t sockets; - // If false, app_thread_t object is not associated with any OS thread. - // In such case, 'tid' member contains a bogus value. - bool associated; - - // Thread ID associated with this slot. - thread_t::id_t tid; - // App thread's signaler object. struct i_signaler *signaler; diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 167b823..8aafcf8 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -51,11 +51,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, // Create application thread proxies. for (int i = 0; i != app_threads_; i++) { - app_thread_t *app_thread = new (std::nothrow) app_thread_t (this, i, - flags_); - zmq_assert (app_thread); - app_threads.push_back (app_thread); - signalers.push_back (app_thread->get_signaler ()); + app_thread_info_t info; + info.associated = false; + info.app_thread = new (std::nothrow) app_thread_t (this, i, flags_); + zmq_assert (info.app_thread); + app_threads.push_back (info); + signalers.push_back (info.app_thread->get_signaler ()); } // Create I/O thread objects. @@ -110,7 +111,7 @@ zmq::dispatcher_t::~dispatcher_t () // Close all application theads, sockets, io_objects etc. for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - delete app_threads [i]; + delete app_threads [i].app_thread; // Deallocate all the orphaned pipes. while (!pipes.empty ()) @@ -132,13 +133,37 @@ int zmq::dispatcher_t::thread_slot_count () zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) { - threads_sync.lock (); - app_thread_t *thread = choose_app_thread (); - if (!thread) { - threads_sync.unlock (); - return NULL; + app_threads_sync.lock (); + + // Find whether the calling thread has app_thread_t object associated + // already. At the same time find an unused app_thread_t so that it can + // be used if there's no associated object for the calling thread. + // Check whether thread ID is already assigned. If so, return it. + app_threads_t::size_type unused = app_threads.size (); + app_threads_t::size_type current; + for (current = 0; current != app_threads.size (); current++) { + if (app_threads [current].associated && + thread_t::equal (thread_t::id (), app_threads [current].tid)) + break; + if (!app_threads [current].associated) + unused = current; + } + + // If no app_thread_t is associated with the calling thread, + // associate it with one of the unused app_thread_t objects. + if (current == app_threads.size ()) { + if (unused == app_threads.size ()) { + app_threads_sync.unlock (); + errno = EMTHREAD; + return NULL; + } + app_threads [unused].associated = true; + app_threads [unused].tid = thread_t::id (); + current = unused; } - threads_sync.unlock (); + + app_thread_t *thread = app_threads [current].app_thread; + app_threads_sync.unlock (); socket_base_t *s = thread->create_socket (type_); if (!s) @@ -165,6 +190,19 @@ void zmq::dispatcher_t::destroy_socket () delete this; } +void zmq::dispatcher_t::no_sockets (app_thread_t *thread_) +{ + app_threads_sync.lock (); + app_threads_t::size_type i; + for (i = 0; i != app_threads.size (); i++) + if (app_threads [i].app_thread == thread_) { + app_threads [i].associated = false; + break; + } + zmq_assert (i != app_threads.size ()); + app_threads_sync.unlock (); +} + void zmq::dispatcher_t::write (int source_, int destination_, const command_t &command_) { @@ -182,23 +220,6 @@ bool zmq::dispatcher_t::read (int source_, int destination_, destination_].read (command_); } -zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () -{ - // Check whether thread ID is already assigned. If so, return it. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - if (app_threads [i]->is_current ()) - return app_threads [i]; - - // Check whether there's an unused thread slot in the cotext. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - if (app_threads [i]->make_current ()) - return app_threads [i]; - - // Thread limit was exceeded. - errno = EMTHREAD; - return NULL; -} - zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_) { // Find the I/O thread with minimum load. diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 2c11c57..1cd5207 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -31,6 +31,7 @@ #include "config.hpp" #include "mutex.hpp" #include "stdint.hpp" +#include "thread.hpp" namespace zmq { @@ -64,6 +65,10 @@ namespace zmq // Destroy a socket. void destroy_socket (); + // Called by app_thread_t when it has no more sockets. The function + // should disassociate the object from the current OS thread. + void no_sockets (class app_thread_t *thread_); + // Returns number of thread slots in the dispatcher. To be used by // individual threads to find out how many distinct signals can be // received. @@ -94,14 +99,27 @@ namespace zmq ~dispatcher_t (); - // Returns the app thread associated with the current thread. - // NULL if we are out of app thread slots. - class app_thread_t *choose_app_thread (); + struct app_thread_info_t + { + // If false, 0MQ application thread is free, there's no associated + // OS thread. + bool associated; + + // ID of the associated OS thread. If 'associated' is false, + // this field contains bogus data. + thread_t::id_t tid; + + // Pointer to the 0MQ application thread object. + class app_thread_t *app_thread; + }; // Application threads. - typedef std::vector <class app_thread_t*> app_threads_t; + typedef std::vector <app_thread_info_t> app_threads_t; app_threads_t app_threads; + // Synchronisation of accesses to shared application thread data. + mutex_t app_threads_sync; + // I/O threads. typedef std::vector <class io_thread_t*> io_threads_t; io_threads_t io_threads; @@ -116,9 +134,6 @@ namespace zmq // NxN matrix of command pipes. command_pipe_t *command_pipes; - // Synchronisation of accesses to shared thread data. - mutex_t threads_sync; - // As pipes may reside in orphaned state in particular moments // of the pipe shutdown process, i.e. neither pipe reader nor // pipe writer hold reference to the pipe, we have to hold references |