diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/app_thread.cpp | 23 | ||||
| -rw-r--r-- | src/app_thread.hpp | 19 | ||||
| -rw-r--r-- | src/dispatcher.cpp | 79 | ||||
| -rw-r--r-- | src/dispatcher.hpp | 29 | 
4 files changed, 76 insertions, 74 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp index c55eb75..eff0cf7 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -62,7 +62,6 @@  zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,        int flags_) :      object_t (dispatcher_, thread_slot_), -    associated (false),      last_processing_time (0)  {      if (flags_ & ZMQ_POLL) { @@ -87,24 +86,6 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler ()      return signaler;  } -bool zmq::app_thread_t::is_current () -{ -    return !sockets.empty () && associated && -        thread_t::equal (tid, thread_t::id ()); -} - -bool zmq::app_thread_t::make_current () -{ -    //  If there are object managed by this slot we cannot assign the slot -    //  to a different thread. -    if (!sockets.empty ()) -        return false; - -    associated = true; -    tid = thread_t::id (); -    return true; -} -  void zmq::app_thread_t::process_commands (bool block_, bool throttle_)  {      uint64_t signals; @@ -191,6 +172,8 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)          s = new (std::nothrow) downstream_t (this);          break;      default: +        if (sockets.empty ()) +            dispatcher->no_sockets (this);          errno = EINVAL;          return NULL;      } @@ -204,4 +187,6 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)  void zmq::app_thread_t::remove_socket (socket_base_t *socket_)  {      sockets.erase (socket_); +    if (sockets.empty ()) +        dispatcher->no_sockets (this);  } diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 0e2c3e1..9bd5641 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -25,7 +25,6 @@  #include "stdint.hpp"  #include "object.hpp"  #include "yarray.hpp" -#include "thread.hpp"  namespace zmq  { @@ -42,17 +41,6 @@ namespace zmq          //  Returns signaler associated with this application thread.          struct i_signaler *get_signaler (); -        //  Nota bene: Following two functions are accessed from different -        //  threads. The caller (dispatcher) is responsible for synchronisation -        //  of accesses. - -        //  Returns true is current thread is associated with the app thread. -        bool is_current (); - -        //  Tries to associate current thread with the app thread object. -        //  Returns true is successfull, false otherwise. -        bool make_current (); -          //  Processes commands sent to this thread (if any). If 'block' is          //  set to true, returns only after at least one command was processed.          //  If throttle argument is true, commands are processed at most once @@ -71,13 +59,6 @@ namespace zmq          typedef yarray_t <socket_base_t> sockets_t;          sockets_t sockets; -        //  If false, app_thread_t object is not associated with any OS thread. -        //  In such case, 'tid' member contains a bogus value. -        bool associated; - -        //  Thread ID associated with this slot. -        thread_t::id_t tid; -          //  App thread's signaler object.          struct i_signaler *signaler; diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 167b823..8aafcf8 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -51,11 +51,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,      //  Create application thread proxies.      for (int i = 0; i != app_threads_; i++) { -        app_thread_t *app_thread = new (std::nothrow) app_thread_t (this, i, -            flags_); -        zmq_assert (app_thread); -        app_threads.push_back (app_thread); -        signalers.push_back (app_thread->get_signaler ()); +        app_thread_info_t info; +        info.associated = false; +        info.app_thread = new (std::nothrow) app_thread_t (this, i, flags_); +        zmq_assert (info.app_thread); +        app_threads.push_back (info); +        signalers.push_back (info.app_thread->get_signaler ());      }      //  Create I/O thread objects. @@ -110,7 +111,7 @@ zmq::dispatcher_t::~dispatcher_t ()      //  Close all application theads, sockets, io_objects etc.      for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) -        delete app_threads [i]; +        delete app_threads [i].app_thread;      //  Deallocate all the orphaned pipes.      while (!pipes.empty ()) @@ -132,13 +133,37 @@ int zmq::dispatcher_t::thread_slot_count ()  zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)  { -    threads_sync.lock (); -    app_thread_t *thread = choose_app_thread (); -    if (!thread) { -        threads_sync.unlock (); -        return NULL; +    app_threads_sync.lock (); + +    //  Find whether the calling thread has app_thread_t object associated +    //  already. At the same time find an unused app_thread_t so that it can +    //  be used if there's no associated object for the calling thread. +    //  Check whether thread ID is already assigned. If so, return it. +    app_threads_t::size_type unused = app_threads.size (); +    app_threads_t::size_type current; +    for (current = 0; current != app_threads.size (); current++) { +        if (app_threads [current].associated && +              thread_t::equal (thread_t::id (), app_threads [current].tid)) +            break; +        if (!app_threads [current].associated) +            unused = current; +    } + +    //  If no app_thread_t is associated with the calling thread, +    //  associate it with one of the unused app_thread_t objects. +    if (current == app_threads.size ()) { +        if (unused == app_threads.size ()) { +            app_threads_sync.unlock (); +            errno = EMTHREAD; +            return NULL; +        } +        app_threads [unused].associated = true; +        app_threads [unused].tid = thread_t::id (); +        current = unused;      } -    threads_sync.unlock (); + +    app_thread_t *thread = app_threads [current].app_thread; +    app_threads_sync.unlock ();      socket_base_t *s = thread->create_socket (type_);      if (!s) @@ -165,6 +190,19 @@ void zmq::dispatcher_t::destroy_socket ()         delete this;  } +void zmq::dispatcher_t::no_sockets (app_thread_t *thread_) +{ +    app_threads_sync.lock (); +    app_threads_t::size_type i; +    for (i = 0; i != app_threads.size (); i++) +        if (app_threads [i].app_thread == thread_) { +            app_threads [i].associated = false; +            break; +        } +    zmq_assert (i != app_threads.size ()); +    app_threads_sync.unlock (); +} +  void zmq::dispatcher_t::write (int source_, int destination_,      const command_t &command_)  { @@ -182,23 +220,6 @@ bool zmq::dispatcher_t::read (int source_,  int destination_,          destination_].read (command_);  } -zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () -{ -    //  Check whether thread ID is already assigned. If so, return it. -    for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) -        if (app_threads [i]->is_current ()) -            return app_threads [i]; - -    //  Check whether there's an unused thread slot in the cotext. -    for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) -        if (app_threads [i]->make_current ()) -            return app_threads [i]; - -    //  Thread limit was exceeded. -    errno = EMTHREAD; -    return NULL; -} -  zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)  {      //  Find the I/O thread with minimum load. diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 2c11c57..1cd5207 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -31,6 +31,7 @@  #include "config.hpp"  #include "mutex.hpp"  #include "stdint.hpp" +#include "thread.hpp"  namespace zmq  { @@ -64,6 +65,10 @@ namespace zmq          //  Destroy a socket.          void destroy_socket (); +        //  Called by app_thread_t when it has no more sockets. The function +        //  should disassociate the object from the current OS thread. +        void no_sockets (class app_thread_t *thread_); +          //  Returns number of thread slots in the dispatcher. To be used by          //  individual threads to find out how many distinct signals can be          //  received. @@ -94,14 +99,27 @@ namespace zmq          ~dispatcher_t (); -        //  Returns the app thread associated with the current thread. -        //  NULL if we are out of app thread slots. -        class app_thread_t *choose_app_thread (); +        struct app_thread_info_t +        { +            //  If false, 0MQ application thread is free, there's no associated +            //  OS thread. +            bool associated; + +            //  ID of the associated OS thread. If 'associated' is false, +            //  this field contains bogus data. +            thread_t::id_t tid; + +            //  Pointer to the 0MQ application thread object. +            class app_thread_t *app_thread; +        };          //  Application threads. -        typedef std::vector <class app_thread_t*> app_threads_t; +        typedef std::vector <app_thread_info_t> app_threads_t;          app_threads_t app_threads; +        //  Synchronisation of accesses to shared application thread data. +        mutex_t app_threads_sync; +          //  I/O threads.          typedef std::vector <class io_thread_t*> io_threads_t;          io_threads_t io_threads; @@ -116,9 +134,6 @@ namespace zmq          //  NxN matrix of command pipes.          command_pipe_t *command_pipes; -        //  Synchronisation of accesses to shared thread data. -        mutex_t threads_sync; -          //  As pipes may reside in orphaned state in particular moments          //  of the pipe shutdown process, i.e. neither pipe reader nor          //  pipe writer hold reference to the pipe, we have to hold references  | 
