diff options
Diffstat (limited to 'src/ctx.cpp')
-rw-r--r-- | src/ctx.cpp | 22 |
1 files changed, 19 insertions, 3 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp index 54e665a..3ef2d4a 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -32,6 +32,7 @@ #include "ctx.hpp" #include "socket_base.hpp" #include "io_thread.hpp" +#include "monitor.hpp" #include "reaper.hpp" #include "pipe.hpp" #include "err.hpp" @@ -79,6 +80,13 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : zmq_assert (log_socket); rc = log_socket->bind ("sys://log"); zmq_assert (rc == 0); + + // Create the monitor object. + io_thread_t *io_thread = choose_io_thread (0); + zmq_assert (io_thread); + monitor = new (std::nothrow) monitor_t (io_thread); + alloc_assert (monitor); + monitor->start (); } bool zmq::ctx_t::check_tag () @@ -123,15 +131,23 @@ int zmq::ctx_t::terminate () // First attempt to terminate the context. if (!restarted) { + // Close the monitor object. Wait for done command from the monitor. + monitor->stop (); + command_t cmd; + int rc = term_mailbox.recv (&cmd, -1); + zmq_assert (rc == 0); + zmq_assert (cmd.type == command_t::done); + // Close the logging infrastructure. log_sync.lock (); - int rc = log_socket->close (); + rc = log_socket->close (); zmq_assert (rc == 0); log_socket = NULL; log_sync.unlock (); - // First send stop command to sockets so that any blocking calls can be - // interrupted. If there are no sockets we can ask reaper thread to stop. + // First send stop command to sockets so that any blocking calls + // can be interrupted. If there are no sockets we can ask reaper + // thread to stop. slot_sync.lock (); terminating = true; for (sockets_t::size_type i = 0; i != sockets.size (); i++) |