summaryrefslogtreecommitdiff
path: root/src/ctx.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ctx.cpp')
-rw-r--r--src/ctx.cpp22
1 files changed, 19 insertions, 3 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 54e665a..3ef2d4a 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -32,6 +32,7 @@
#include "ctx.hpp"
#include "socket_base.hpp"
#include "io_thread.hpp"
+#include "monitor.hpp"
#include "reaper.hpp"
#include "pipe.hpp"
#include "err.hpp"
@@ -79,6 +80,13 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
zmq_assert (log_socket);
rc = log_socket->bind ("sys://log");
zmq_assert (rc == 0);
+
+ // Create the monitor object.
+ io_thread_t *io_thread = choose_io_thread (0);
+ zmq_assert (io_thread);
+ monitor = new (std::nothrow) monitor_t (io_thread);
+ alloc_assert (monitor);
+ monitor->start ();
}
bool zmq::ctx_t::check_tag ()
@@ -123,15 +131,23 @@ int zmq::ctx_t::terminate ()
// First attempt to terminate the context.
if (!restarted) {
+ // Close the monitor object. Wait for done command from the monitor.
+ monitor->stop ();
+ command_t cmd;
+ int rc = term_mailbox.recv (&cmd, -1);
+ zmq_assert (rc == 0);
+ zmq_assert (cmd.type == command_t::done);
+
// Close the logging infrastructure.
log_sync.lock ();
- int rc = log_socket->close ();
+ rc = log_socket->close ();
zmq_assert (rc == 0);
log_socket = NULL;
log_sync.unlock ();
- // First send stop command to sockets so that any blocking calls can be
- // interrupted. If there are no sockets we can ask reaper thread to stop.
+ // First send stop command to sockets so that any blocking calls
+ // can be interrupted. If there are no sockets we can ask reaper
+ // thread to stop.
slot_sync.lock ();
terminating = true;
for (sockets_t::size_type i = 0; i != sockets.size (); i++)