summaryrefslogtreecommitdiff
path: root/src/dispatcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dispatcher.cpp')
-rw-r--r--src/dispatcher.cpp79
1 files changed, 50 insertions, 29 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 167b823..8aafcf8 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -51,11 +51,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
// Create application thread proxies.
for (int i = 0; i != app_threads_; i++) {
- app_thread_t *app_thread = new (std::nothrow) app_thread_t (this, i,
- flags_);
- zmq_assert (app_thread);
- app_threads.push_back (app_thread);
- signalers.push_back (app_thread->get_signaler ());
+ app_thread_info_t info;
+ info.associated = false;
+ info.app_thread = new (std::nothrow) app_thread_t (this, i, flags_);
+ zmq_assert (info.app_thread);
+ app_threads.push_back (info);
+ signalers.push_back (info.app_thread->get_signaler ());
}
// Create I/O thread objects.
@@ -110,7 +111,7 @@ zmq::dispatcher_t::~dispatcher_t ()
// Close all application theads, sockets, io_objects etc.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- delete app_threads [i];
+ delete app_threads [i].app_thread;
// Deallocate all the orphaned pipes.
while (!pipes.empty ())
@@ -132,13 +133,37 @@ int zmq::dispatcher_t::thread_slot_count ()
zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
{
- threads_sync.lock ();
- app_thread_t *thread = choose_app_thread ();
- if (!thread) {
- threads_sync.unlock ();
- return NULL;
+ app_threads_sync.lock ();
+
+ // Find whether the calling thread has app_thread_t object associated
+ // already. At the same time find an unused app_thread_t so that it can
+ // be used if there's no associated object for the calling thread.
+ // Check whether thread ID is already assigned. If so, return it.
+ app_threads_t::size_type unused = app_threads.size ();
+ app_threads_t::size_type current;
+ for (current = 0; current != app_threads.size (); current++) {
+ if (app_threads [current].associated &&
+ thread_t::equal (thread_t::id (), app_threads [current].tid))
+ break;
+ if (!app_threads [current].associated)
+ unused = current;
+ }
+
+ // If no app_thread_t is associated with the calling thread,
+ // associate it with one of the unused app_thread_t objects.
+ if (current == app_threads.size ()) {
+ if (unused == app_threads.size ()) {
+ app_threads_sync.unlock ();
+ errno = EMTHREAD;
+ return NULL;
+ }
+ app_threads [unused].associated = true;
+ app_threads [unused].tid = thread_t::id ();
+ current = unused;
}
- threads_sync.unlock ();
+
+ app_thread_t *thread = app_threads [current].app_thread;
+ app_threads_sync.unlock ();
socket_base_t *s = thread->create_socket (type_);
if (!s)
@@ -165,6 +190,19 @@ void zmq::dispatcher_t::destroy_socket ()
delete this;
}
+void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
+{
+ app_threads_sync.lock ();
+ app_threads_t::size_type i;
+ for (i = 0; i != app_threads.size (); i++)
+ if (app_threads [i].app_thread == thread_) {
+ app_threads [i].associated = false;
+ break;
+ }
+ zmq_assert (i != app_threads.size ());
+ app_threads_sync.unlock ();
+}
+
void zmq::dispatcher_t::write (int source_, int destination_,
const command_t &command_)
{
@@ -182,23 +220,6 @@ bool zmq::dispatcher_t::read (int source_, int destination_,
destination_].read (command_);
}
-zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
-{
- // Check whether thread ID is already assigned. If so, return it.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- if (app_threads [i]->is_current ())
- return app_threads [i];
-
- // Check whether there's an unused thread slot in the cotext.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- if (app_threads [i]->make_current ())
- return app_threads [i];
-
- // Thread limit was exceeded.
- errno = EMTHREAD;
- return NULL;
-}
-
zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)
{
// Find the I/O thread with minimum load.