summaryrefslogtreecommitdiff
path: root/src/dispatcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dispatcher.cpp')
-rw-r--r--src/dispatcher.cpp36
1 files changed, 35 insertions, 1 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 71e20df..49c2197 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -30,7 +30,9 @@
#include "windows.h"
#endif
-zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
+zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
+ sockets (0),
+ terminated (false)
{
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
@@ -68,6 +70,20 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
io_threads [i]->start ();
}
+int zmq::dispatcher_t::term ()
+{
+ term_sync.lock ();
+ zmq_assert (!terminated);
+ terminated = true;
+ bool destroy = (sockets == 0);
+ term_sync.unlock ();
+
+ if (destroy)
+ delete this;
+
+ return 0;
+}
+
zmq::dispatcher_t::~dispatcher_t ()
{
// Close all application theads, sockets, io_objects etc.
@@ -111,9 +127,27 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
}
threads_sync.unlock ();
+ term_sync.lock ();
+ sockets++;
+ term_sync.unlock ();
+
return thread->create_socket (type_);
}
+void zmq::dispatcher_t::destroy_socket ()
+{
+ // If zmq_term was already called and there are no more sockets,
+ // terminate the whole 0MQ infrastructure.
+ term_sync.lock ();
+ zmq_assert (sockets > 0);
+ sockets--;
+ bool destroy = (sockets == 0 && terminated);
+ term_sync.unlock ();
+
+ if (destroy)
+ delete this;
+}
+
zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{
// Check whether thread ID is already assigned. If so, return it.