From 80ac398bba928fa7f245d2e107071677a13185cf Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 9 Feb 2011 15:32:15 +0100 Subject: Initial implementation of reaper thread. Reaper thread destroys the socket asynchronously. zmq_term() can be interrupted by a signal (EINTR). zmq_socket() will return ETERM after zmq_term() was called. Signed-off-by: Martin Sustrik --- src/ctx.cpp | 158 ++++++++++++++++++++++++++++-------------------------------- 1 file changed, 73 insertions(+), 85 deletions(-) (limited to 'src/ctx.cpp') diff --git a/src/ctx.cpp b/src/ctx.cpp index 91b0236..10c91c9 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -24,6 +24,7 @@ #include "socket_base.hpp" #include "io_thread.hpp" #include "platform.hpp" +#include "reaper.hpp" #include "err.hpp" #include "pipe.hpp" @@ -34,7 +35,7 @@ #endif zmq::ctx_t::ctx_t (uint32_t io_threads_) : - no_sockets_notify (false) + terminating (false) { int rc; @@ -49,13 +50,23 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : HIBYTE (wsa_data.wVersion) == 2); #endif - // Initialise the array of mailboxes. +1 accounts for internal log socket. - slot_count = max_sockets + io_threads_ + 1; + // Initialise the array of mailboxes. Additional three slots are for + // internal log socket and the zmq_term thread the reaper thread. + slot_count = max_sockets + io_threads_ + 3; slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); zmq_assert (slots); + // Initialise the infrastructure for zmq_term thread. + slots [term_tid] = &term_mailbox; + + // Create the reaper thread. + reaper = new (std::nothrow) reaper_t (this, reaper_tid); + zmq_assert (reaper); + slots [reaper_tid] = reaper->get_mailbox (); + reaper->start (); + // Create I/O thread objects and launch them. - for (uint32_t i = 0; i != io_threads_; i++) { + for (uint32_t i = 2; i != io_threads_ + 2; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); zmq_assert (io_thread); io_threads.push_back (io_thread); @@ -65,7 +76,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : // In the unused part of the slot array, create a list of empty slots. for (int32_t i = (int32_t) slot_count - 1; - i >= (int32_t) io_threads_; i--) { + i >= (int32_t) io_threads_ + 2; i--) { empty_slots.push_back (i); slots [i] = NULL; } @@ -79,9 +90,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : zmq::ctx_t::~ctx_t () { - // Check that there are no remaining open or zombie sockets. + // Check that there are no remaining sockets. zmq_assert (sockets.empty ()); - zmq_assert (zombies.empty ()); // Ask I/O threads to terminate. If stop signal wasn't sent to I/O // thread subsequent invocation of destructor would hang-up. @@ -92,6 +102,9 @@ zmq::ctx_t::~ctx_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; + // Deallocate the reaper thread object. + delete reaper; + // Deallocate the array of mailboxes. No special work is // needed as mailboxes themselves were deallocated with their // corresponding io_thread/socket objects. @@ -106,52 +119,42 @@ zmq::ctx_t::~ctx_t () int zmq::ctx_t::terminate () { - // Close the logging infrastructure. - log_sync.lock (); - int rc = log_socket->close (); - zmq_assert (rc == 0); - log_socket = NULL; - log_sync.unlock (); - - // First send stop command to sockets so that any - // blocking calls are interrupted. + // Check whether termination was already underway, but interrupted and now + // restarted. slot_sync.lock (); - for (sockets_t::size_type i = 0; i != sockets.size (); i++) - sockets [i]->stop (); - if (!sockets.empty ()) - no_sockets_notify = true; + bool restarted = terminating; slot_sync.unlock (); - // Find out whether there are any open sockets to care about. - // If there are open sockets, sleep till they are closed. Note that we can - // use no_sockets_notify safely out of the critical section as once set - // its value is never changed again. - if (no_sockets_notify) - no_sockets_sync.wait (); + // First attempt to terminate the context. + if (!restarted) { + + // Close the logging infrastructure. + log_sync.lock (); + int rc = log_socket->close (); + zmq_assert (rc == 0); + log_socket = NULL; + log_sync.unlock (); + + // First send stop command to sockets so that any blocking calls can be + // interrupted. If there are no sockets we can ask reaper thread to stop. + slot_sync.lock (); + terminating = true; + for (sockets_t::size_type i = 0; i != sockets.size (); i++) + sockets [i]->stop (); + if (sockets.empty ()) + reaper->stop (); + slot_sync.unlock (); + } - // Note that the lock won't block anyone here. There's noone else having - // open sockets anyway. The only purpose of the lock is to double-check all - // the CPU caches have been synchronised. + // Wait till reaper thread closes all the sockets. + command_t cmd; + int rc = term_mailbox.recv (&cmd, true); + if (rc == -1 && errno == EINTR) + return -1; + zmq_assert (rc == 0); + zmq_assert (cmd.type == command_t::done); slot_sync.lock (); - - // At this point there should be no active sockets. What we have is a set - // of zombies waiting to be dezombified. zmq_assert (sockets.empty ()); - - // Get rid of remaining zombie sockets. - while (!zombies.empty ()) { - dezombify (); - - // Sleep for 1ms not to end up busy-looping in the case the I/O threads - // are still busy sending data. We can possibly add a grand poll here - // (polling for fds associated with all the zombie sockets), but it's - // probably not worth of implementing it. -#if defined ZMQ_HAVE_WINDOWS - Sleep (1); -#else - usleep (1000); -#endif - } slot_sync.unlock (); // Deallocate the resources. @@ -164,8 +167,12 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) { slot_sync.lock (); - // Free the slots, if possible. - dezombify (); + // Once zmq_term() was called, we can't create new sockets. + if (terminating) { + slot_sync.unlock (); + errno = ETERM; + return NULL; + } // If max_sockets limit was reached, return error. if (empty_slots.empty ()) { @@ -193,29 +200,31 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) return s; } -void zmq::ctx_t::zombify_socket (socket_base_t *socket_) +void zmq::ctx_t::destroy_socket (class socket_base_t *socket_) { - // Zombification of socket basically means that its ownership is tranferred - // from the application that created it to the context. - - // Note that the lock provides the memory barrier needed to migrate - // zombie-to-be socket from it's native thread to shared data area - // synchronised by slot_sync. slot_sync.lock (); - sockets.erase (socket_); - zombies.push_back (socket_); - // Try to get rid of at least some zombie sockets at this point. - dezombify (); + // Free the associared thread slot. + uint32_t tid = socket_->get_tid (); + empty_slots.push_back (tid); + slots [tid] = NULL; - // If shutdown thread is interested in notification about no more - // open sockets, notify it now. - if (sockets.empty () && no_sockets_notify) - no_sockets_sync.post (); + // Remove the socket from the list of sockets. + sockets.erase (socket_); + + // If zmq_term() was already called and there are no more socket + // we can ask reaper thread to terminate. + if (terminating && sockets.empty ()) + reaper->stop (); slot_sync.unlock (); } +zmq::object_t *zmq::ctx_t::get_reaper () +{ + return reaper; +} + void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); @@ -309,25 +318,4 @@ void zmq::ctx_t::log (zmq_msg_t *msg_) log_sync.unlock (); } -void zmq::ctx_t::dezombify () -{ - // Try to dezombify each zombie in the list. Note that caller is - // responsible for calling this method in the slot_sync critical section. - for (zombies_t::iterator it = zombies.begin (); it != zombies.end ();) { - uint32_t tid = (*it)->get_tid (); - if ((*it)->dezombify ()) { -#if defined _MSC_VER - - // HP implementation of STL requires doing it this way... - it = zombies.erase (it); -#else - zombies.erase (it); -#endif - empty_slots.push_back (tid); - slots [tid] = NULL; - } - else - ++it; - } -} -- cgit v1.2.3