summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/app_thread.cpp23
-rw-r--r--src/app_thread.hpp19
-rw-r--r--src/dispatcher.cpp79
-rw-r--r--src/dispatcher.hpp29
4 files changed, 76 insertions, 74 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index c55eb75..eff0cf7 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -62,7 +62,6 @@
zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
int flags_) :
object_t (dispatcher_, thread_slot_),
- associated (false),
last_processing_time (0)
{
if (flags_ & ZMQ_POLL) {
@@ -87,24 +86,6 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler ()
return signaler;
}
-bool zmq::app_thread_t::is_current ()
-{
- return !sockets.empty () && associated &&
- thread_t::equal (tid, thread_t::id ());
-}
-
-bool zmq::app_thread_t::make_current ()
-{
- // If there are object managed by this slot we cannot assign the slot
- // to a different thread.
- if (!sockets.empty ())
- return false;
-
- associated = true;
- tid = thread_t::id ();
- return true;
-}
-
void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
uint64_t signals;
@@ -191,6 +172,8 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
s = new (std::nothrow) downstream_t (this);
break;
default:
+ if (sockets.empty ())
+ dispatcher->no_sockets (this);
errno = EINVAL;
return NULL;
}
@@ -204,4 +187,6 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
{
sockets.erase (socket_);
+ if (sockets.empty ())
+ dispatcher->no_sockets (this);
}
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 0e2c3e1..9bd5641 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -25,7 +25,6 @@
#include "stdint.hpp"
#include "object.hpp"
#include "yarray.hpp"
-#include "thread.hpp"
namespace zmq
{
@@ -42,17 +41,6 @@ namespace zmq
// Returns signaler associated with this application thread.
struct i_signaler *get_signaler ();
- // Nota bene: Following two functions are accessed from different
- // threads. The caller (dispatcher) is responsible for synchronisation
- // of accesses.
-
- // Returns true is current thread is associated with the app thread.
- bool is_current ();
-
- // Tries to associate current thread with the app thread object.
- // Returns true is successfull, false otherwise.
- bool make_current ();
-
// Processes commands sent to this thread (if any). If 'block' is
// set to true, returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once
@@ -71,13 +59,6 @@ namespace zmq
typedef yarray_t <socket_base_t> sockets_t;
sockets_t sockets;
- // If false, app_thread_t object is not associated with any OS thread.
- // In such case, 'tid' member contains a bogus value.
- bool associated;
-
- // Thread ID associated with this slot.
- thread_t::id_t tid;
-
// App thread's signaler object.
struct i_signaler *signaler;
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 167b823..8aafcf8 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -51,11 +51,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
// Create application thread proxies.
for (int i = 0; i != app_threads_; i++) {
- app_thread_t *app_thread = new (std::nothrow) app_thread_t (this, i,
- flags_);
- zmq_assert (app_thread);
- app_threads.push_back (app_thread);
- signalers.push_back (app_thread->get_signaler ());
+ app_thread_info_t info;
+ info.associated = false;
+ info.app_thread = new (std::nothrow) app_thread_t (this, i, flags_);
+ zmq_assert (info.app_thread);
+ app_threads.push_back (info);
+ signalers.push_back (info.app_thread->get_signaler ());
}
// Create I/O thread objects.
@@ -110,7 +111,7 @@ zmq::dispatcher_t::~dispatcher_t ()
// Close all application theads, sockets, io_objects etc.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- delete app_threads [i];
+ delete app_threads [i].app_thread;
// Deallocate all the orphaned pipes.
while (!pipes.empty ())
@@ -132,13 +133,37 @@ int zmq::dispatcher_t::thread_slot_count ()
zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
{
- threads_sync.lock ();
- app_thread_t *thread = choose_app_thread ();
- if (!thread) {
- threads_sync.unlock ();
- return NULL;
+ app_threads_sync.lock ();
+
+ // Find whether the calling thread has app_thread_t object associated
+ // already. At the same time find an unused app_thread_t so that it can
+ // be used if there's no associated object for the calling thread.
+ // Check whether thread ID is already assigned. If so, return it.
+ app_threads_t::size_type unused = app_threads.size ();
+ app_threads_t::size_type current;
+ for (current = 0; current != app_threads.size (); current++) {
+ if (app_threads [current].associated &&
+ thread_t::equal (thread_t::id (), app_threads [current].tid))
+ break;
+ if (!app_threads [current].associated)
+ unused = current;
+ }
+
+ // If no app_thread_t is associated with the calling thread,
+ // associate it with one of the unused app_thread_t objects.
+ if (current == app_threads.size ()) {
+ if (unused == app_threads.size ()) {
+ app_threads_sync.unlock ();
+ errno = EMTHREAD;
+ return NULL;
+ }
+ app_threads [unused].associated = true;
+ app_threads [unused].tid = thread_t::id ();
+ current = unused;
}
- threads_sync.unlock ();
+
+ app_thread_t *thread = app_threads [current].app_thread;
+ app_threads_sync.unlock ();
socket_base_t *s = thread->create_socket (type_);
if (!s)
@@ -165,6 +190,19 @@ void zmq::dispatcher_t::destroy_socket ()
delete this;
}
+void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
+{
+ app_threads_sync.lock ();
+ app_threads_t::size_type i;
+ for (i = 0; i != app_threads.size (); i++)
+ if (app_threads [i].app_thread == thread_) {
+ app_threads [i].associated = false;
+ break;
+ }
+ zmq_assert (i != app_threads.size ());
+ app_threads_sync.unlock ();
+}
+
void zmq::dispatcher_t::write (int source_, int destination_,
const command_t &command_)
{
@@ -182,23 +220,6 @@ bool zmq::dispatcher_t::read (int source_, int destination_,
destination_].read (command_);
}
-zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
-{
- // Check whether thread ID is already assigned. If so, return it.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- if (app_threads [i]->is_current ())
- return app_threads [i];
-
- // Check whether there's an unused thread slot in the cotext.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- if (app_threads [i]->make_current ())
- return app_threads [i];
-
- // Thread limit was exceeded.
- errno = EMTHREAD;
- return NULL;
-}
-
zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)
{
// Find the I/O thread with minimum load.
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index 2c11c57..1cd5207 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -31,6 +31,7 @@
#include "config.hpp"
#include "mutex.hpp"
#include "stdint.hpp"
+#include "thread.hpp"
namespace zmq
{
@@ -64,6 +65,10 @@ namespace zmq
// Destroy a socket.
void destroy_socket ();
+ // Called by app_thread_t when it has no more sockets. The function
+ // should disassociate the object from the current OS thread.
+ void no_sockets (class app_thread_t *thread_);
+
// Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be
// received.
@@ -94,14 +99,27 @@ namespace zmq
~dispatcher_t ();
- // Returns the app thread associated with the current thread.
- // NULL if we are out of app thread slots.
- class app_thread_t *choose_app_thread ();
+ struct app_thread_info_t
+ {
+ // If false, 0MQ application thread is free, there's no associated
+ // OS thread.
+ bool associated;
+
+ // ID of the associated OS thread. If 'associated' is false,
+ // this field contains bogus data.
+ thread_t::id_t tid;
+
+ // Pointer to the 0MQ application thread object.
+ class app_thread_t *app_thread;
+ };
// Application threads.
- typedef std::vector <class app_thread_t*> app_threads_t;
+ typedef std::vector <app_thread_info_t> app_threads_t;
app_threads_t app_threads;
+ // Synchronisation of accesses to shared application thread data.
+ mutex_t app_threads_sync;
+
// I/O threads.
typedef std::vector <class io_thread_t*> io_threads_t;
io_threads_t io_threads;
@@ -116,9 +134,6 @@ namespace zmq
// NxN matrix of command pipes.
command_pipe_t *command_pipes;
- // Synchronisation of accesses to shared thread data.
- mutex_t threads_sync;
-
// As pipes may reside in orphaned state in particular moments
// of the pipe shutdown process, i.e. neither pipe reader nor
// pipe writer hold reference to the pipe, we have to hold references