From 80ac398bba928fa7f245d2e107071677a13185cf Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 9 Feb 2011 15:32:15 +0100 Subject: Initial implementation of reaper thread. Reaper thread destroys the socket asynchronously. zmq_term() can be interrupted by a signal (EINTR). zmq_socket() will return ETERM after zmq_term() was called. Signed-off-by: Martin Sustrik --- doc/zmq_socket.txt | 3 +- doc/zmq_term.txt | 2 + src/Makefile.am | 2 + src/command.hpp | 15 ++++- src/ctx.cpp | 158 ++++++++++++++++++++++++---------------------------- src/ctx.hpp | 43 +++++++------- src/io_thread.cpp | 2 +- src/io_thread.hpp | 3 + src/object.cpp | 42 ++++++++++++-- src/object.hpp | 8 +-- src/reaper.cpp | 139 +++++++++++++++++++++++++++++++++++++++++++++ src/reaper.hpp | 81 +++++++++++++++++++++++++++ src/socket_base.cpp | 18 +++--- src/socket_base.hpp | 8 ++- 14 files changed, 394 insertions(+), 130 deletions(-) create mode 100644 src/reaper.cpp create mode 100644 src/reaper.hpp diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index 7ac8fa1..da41ad7 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -315,7 +315,8 @@ ERRORS The requested socket 'type' is invalid. *EFAULT*:: The provided 'context' was not valid (NULL). - +*ETERM*:: +The context specified was terminated. SEE ALSO -------- diff --git a/doc/zmq_term.txt b/doc/zmq_term.txt index 2edc765..d3914db 100644 --- a/doc/zmq_term.txt +++ b/doc/zmq_term.txt @@ -47,6 +47,8 @@ ERRORS ------ *EFAULT*:: The provided 'context' was not valid (NULL). +*EINTR*:: +Termination was interrupted by a signal. It can be restarted if needed. SEE ALSO diff --git a/src/Makefile.am b/src/Makefile.am index 9e362f2..b730f38 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -107,6 +107,7 @@ libzmq_la_SOURCES = \ pub.hpp \ pull.hpp \ push.hpp \ + reaper.hpp \ rep.hpp \ req.hpp \ select.hpp \ @@ -166,6 +167,7 @@ libzmq_la_SOURCES = \ poller_base.cpp \ pull.cpp \ push.cpp \ + reaper.cpp \ pub.cpp \ rep.cpp \ req.cpp \ diff --git a/src/command.hpp b/src/command.hpp index 31a0e54..ec0850d 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -45,7 +45,9 @@ namespace zmq pipe_term_ack, term_req, term, - term_ack + term_ack, + reap, + done } type; union { @@ -117,6 +119,17 @@ namespace zmq struct { } term_ack; + // Transfers the ownership of the closed socket + // to the reaper thread. + struct { + class socket_base_t *socket; + } reap; + + // Sent by reaper thread to the term thread when all the sockets + // are successfully deallocated. + struct { + } done; + } args; }; diff --git a/src/ctx.cpp b/src/ctx.cpp index 91b0236..10c91c9 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -24,6 +24,7 @@ #include "socket_base.hpp" #include "io_thread.hpp" #include "platform.hpp" +#include "reaper.hpp" #include "err.hpp" #include "pipe.hpp" @@ -34,7 +35,7 @@ #endif zmq::ctx_t::ctx_t (uint32_t io_threads_) : - no_sockets_notify (false) + terminating (false) { int rc; @@ -49,13 +50,23 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : HIBYTE (wsa_data.wVersion) == 2); #endif - // Initialise the array of mailboxes. +1 accounts for internal log socket. - slot_count = max_sockets + io_threads_ + 1; + // Initialise the array of mailboxes. Additional three slots are for + // internal log socket and the zmq_term thread the reaper thread. + slot_count = max_sockets + io_threads_ + 3; slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); zmq_assert (slots); + // Initialise the infrastructure for zmq_term thread. + slots [term_tid] = &term_mailbox; + + // Create the reaper thread. + reaper = new (std::nothrow) reaper_t (this, reaper_tid); + zmq_assert (reaper); + slots [reaper_tid] = reaper->get_mailbox (); + reaper->start (); + // Create I/O thread objects and launch them. - for (uint32_t i = 0; i != io_threads_; i++) { + for (uint32_t i = 2; i != io_threads_ + 2; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); zmq_assert (io_thread); io_threads.push_back (io_thread); @@ -65,7 +76,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : // In the unused part of the slot array, create a list of empty slots. for (int32_t i = (int32_t) slot_count - 1; - i >= (int32_t) io_threads_; i--) { + i >= (int32_t) io_threads_ + 2; i--) { empty_slots.push_back (i); slots [i] = NULL; } @@ -79,9 +90,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : zmq::ctx_t::~ctx_t () { - // Check that there are no remaining open or zombie sockets. + // Check that there are no remaining sockets. zmq_assert (sockets.empty ()); - zmq_assert (zombies.empty ()); // Ask I/O threads to terminate. If stop signal wasn't sent to I/O // thread subsequent invocation of destructor would hang-up. @@ -92,6 +102,9 @@ zmq::ctx_t::~ctx_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; + // Deallocate the reaper thread object. + delete reaper; + // Deallocate the array of mailboxes. No special work is // needed as mailboxes themselves were deallocated with their // corresponding io_thread/socket objects. @@ -106,52 +119,42 @@ zmq::ctx_t::~ctx_t () int zmq::ctx_t::terminate () { - // Close the logging infrastructure. - log_sync.lock (); - int rc = log_socket->close (); - zmq_assert (rc == 0); - log_socket = NULL; - log_sync.unlock (); - - // First send stop command to sockets so that any - // blocking calls are interrupted. + // Check whether termination was already underway, but interrupted and now + // restarted. slot_sync.lock (); - for (sockets_t::size_type i = 0; i != sockets.size (); i++) - sockets [i]->stop (); - if (!sockets.empty ()) - no_sockets_notify = true; + bool restarted = terminating; slot_sync.unlock (); - // Find out whether there are any open sockets to care about. - // If there are open sockets, sleep till they are closed. Note that we can - // use no_sockets_notify safely out of the critical section as once set - // its value is never changed again. - if (no_sockets_notify) - no_sockets_sync.wait (); + // First attempt to terminate the context. + if (!restarted) { + + // Close the logging infrastructure. + log_sync.lock (); + int rc = log_socket->close (); + zmq_assert (rc == 0); + log_socket = NULL; + log_sync.unlock (); + + // First send stop command to sockets so that any blocking calls can be + // interrupted. If there are no sockets we can ask reaper thread to stop. + slot_sync.lock (); + terminating = true; + for (sockets_t::size_type i = 0; i != sockets.size (); i++) + sockets [i]->stop (); + if (sockets.empty ()) + reaper->stop (); + slot_sync.unlock (); + } - // Note that the lock won't block anyone here. There's noone else having - // open sockets anyway. The only purpose of the lock is to double-check all - // the CPU caches have been synchronised. + // Wait till reaper thread closes all the sockets. + command_t cmd; + int rc = term_mailbox.recv (&cmd, true); + if (rc == -1 && errno == EINTR) + return -1; + zmq_assert (rc == 0); + zmq_assert (cmd.type == command_t::done); slot_sync.lock (); - - // At this point there should be no active sockets. What we have is a set - // of zombies waiting to be dezombified. zmq_assert (sockets.empty ()); - - // Get rid of remaining zombie sockets. - while (!zombies.empty ()) { - dezombify (); - - // Sleep for 1ms not to end up busy-looping in the case the I/O threads - // are still busy sending data. We can possibly add a grand poll here - // (polling for fds associated with all the zombie sockets), but it's - // probably not worth of implementing it. -#if defined ZMQ_HAVE_WINDOWS - Sleep (1); -#else - usleep (1000); -#endif - } slot_sync.unlock (); // Deallocate the resources. @@ -164,8 +167,12 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) { slot_sync.lock (); - // Free the slots, if possible. - dezombify (); + // Once zmq_term() was called, we can't create new sockets. + if (terminating) { + slot_sync.unlock (); + errno = ETERM; + return NULL; + } // If max_sockets limit was reached, return error. if (empty_slots.empty ()) { @@ -193,29 +200,31 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) return s; } -void zmq::ctx_t::zombify_socket (socket_base_t *socket_) +void zmq::ctx_t::destroy_socket (class socket_base_t *socket_) { - // Zombification of socket basically means that its ownership is tranferred - // from the application that created it to the context. - - // Note that the lock provides the memory barrier needed to migrate - // zombie-to-be socket from it's native thread to shared data area - // synchronised by slot_sync. slot_sync.lock (); - sockets.erase (socket_); - zombies.push_back (socket_); - // Try to get rid of at least some zombie sockets at this point. - dezombify (); + // Free the associared thread slot. + uint32_t tid = socket_->get_tid (); + empty_slots.push_back (tid); + slots [tid] = NULL; - // If shutdown thread is interested in notification about no more - // open sockets, notify it now. - if (sockets.empty () && no_sockets_notify) - no_sockets_sync.post (); + // Remove the socket from the list of sockets. + sockets.erase (socket_); + + // If zmq_term() was already called and there are no more socket + // we can ask reaper thread to terminate. + if (terminating && sockets.empty ()) + reaper->stop (); slot_sync.unlock (); } +zmq::object_t *zmq::ctx_t::get_reaper () +{ + return reaper; +} + void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); @@ -309,25 +318,4 @@ void zmq::ctx_t::log (zmq_msg_t *msg_) log_sync.unlock (); } -void zmq::ctx_t::dezombify () -{ - // Try to dezombify each zombie in the list. Note that caller is - // responsible for calling this method in the slot_sync critical section. - for (zombies_t::iterator it = zombies.begin (); it != zombies.end ();) { - uint32_t tid = (*it)->get_tid (); - if ((*it)->dezombify ()) { -#if defined _MSC_VER - - // HP implementation of STL requires doing it this way... - it = zombies.erase (it); -#else - zombies.erase (it); -#endif - empty_slots.push_back (tid); - slots [tid] = NULL; - } - else - ++it; - } -} diff --git a/src/ctx.hpp b/src/ctx.hpp index 92684e6..c07711e 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -64,11 +64,9 @@ namespace zmq // after the last one is closed. int terminate (); - // Create a socket. + // Create and destroy a socket. class socket_base_t *create_socket (int type_); - - // Make socket a zombie. - void zombify_socket (socket_base_t *socket_); + void destroy_socket (class socket_base_t *socket_); // Send command to the destination thread. void send_command (uint32_t tid_, const command_t &command_); @@ -78,6 +76,9 @@ namespace zmq // Returns NULL is no I/O thread is available. class io_thread_t *choose_io_thread (uint64_t affinity_); + // Returns reaper thread object. + class object_t *get_reaper (); + // Management of inproc endpoints. int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (class socket_base_t *socket_); @@ -86,41 +87,36 @@ namespace zmq // Logging. void log (zmq_msg_t *msg_); + enum { + term_tid = 0, + reaper_tid = 1 + }; + private: ~ctx_t (); - // Sockets belonging to this context. + // Sockets belonging to this context. We need the list so that + // we can notify the sockets when zmq_term() is called. The sockets + // will return ETERM then. typedef array_t sockets_t; sockets_t sockets; - // List of sockets that were already closed but not yet deallocated. - // These sockets still have some pipes and I/O objects attached. - typedef std::vector zombies_t; - zombies_t zombies; - // List of unused thread slots. typedef std::vector emtpy_slots_t; emtpy_slots_t empty_slots; - // If true, shutdown thread wants to be informed when there are no - // more open sockets. Do so by posting no_sockets_sync semaphore. - // Note that this variable is synchronised by slot_sync mutex. - bool no_sockets_notify; - - // Object used by zmq_term to wait while all the sockets are closed - // by different application threads. - semaphore_t no_sockets_sync; + // If true, zmq_term was already called. + bool terminating; // Synchronisation of accesses to global slot-related data: - // sockets, zombies, empty_slots, terminated. It also synchronises + // sockets, empty_slots, terminating. It also synchronises // access to zombie sockets as such (as oposed to slots) and provides // a memory barrier to ensure that all CPU cores see the same data. mutex_t slot_sync; - // This function attempts to deallocate as many zombie sockets as - // possible. It must be called within a slot_sync critical section. - void dezombify (); + // The reaper thread. + class reaper_t *reaper; // I/O threads. typedef std::vector io_threads_t; @@ -130,6 +126,9 @@ namespace zmq uint32_t slot_count; mailbox_t **slots; + // Mailbox for zmq_term thread. + mailbox_t term_mailbox; + // List of inproc endpoints within this context. typedef std::map endpoints_t; endpoints_t endpoints; diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 7ba8905..ca768f5 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -75,7 +75,7 @@ void zmq::io_thread_t::in_event () if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) - break; + break; errno_assert (rc == 0); // Process the command. diff --git a/src/io_thread.hpp b/src/io_thread.hpp index b01eecb..b3ea484 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -77,6 +77,9 @@ namespace zmq // I/O multiplexing is performed using a poller object. poller_t *poller; + + io_thread_t (const io_thread_t&); + const io_thread_t &operator = (const io_thread_t&); }; } diff --git a/src/object.cpp b/src/object.cpp index 63b42b4..823d4b6 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -114,6 +114,10 @@ void zmq::object_t::process_command (command_t &cmd_) process_term_ack (); break; + case command_t::reap: + process_reap (cmd_.args.reap.socket); + break; + default: zmq_assert (false); } @@ -138,6 +142,11 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_) return ctx->find_endpoint (addr_); } +void zmq::object_t::destroy_socket (socket_base_t *socket_) +{ + ctx->destroy_socket (socket_); +} + void zmq::object_t::log (zmq_msg_t *msg_) { ctx->log (msg_); @@ -148,11 +157,6 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_) return ctx->choose_io_thread (affinity_); } -void zmq::object_t::zombify_socket (socket_base_t *socket_) -{ - ctx->zombify_socket (socket_); -} - void zmq::object_t::send_stop () { // 'stop' command goes always from administrative thread to @@ -336,6 +340,29 @@ void zmq::object_t::send_term_ack (own_t *destination_) send_command (cmd); } +void zmq::object_t::send_reap (class socket_base_t *socket_) +{ + command_t cmd; +#if defined ZMQ_MAKE_VALGRIND_HAPPY + memset (&cmd, 0, sizeof (cmd)); +#endif + cmd.destination = ctx->get_reaper (); + cmd.type = command_t::reap; + cmd.args.reap.socket = socket_; + send_command (cmd); +} + +void zmq::object_t::send_done () +{ + command_t cmd; +#if defined ZMQ_MAKE_VALGRIND_HAPPY + memset (&cmd, 0, sizeof (cmd)); +#endif + cmd.destination = NULL; + cmd.type = command_t::done; + ctx->send_command (ctx_t::term_tid, cmd); +} + void zmq::object_t::process_stop () { zmq_assert (false); @@ -398,6 +425,11 @@ void zmq::object_t::process_term_ack () zmq_assert (false); } +void zmq::object_t::process_reap (class socket_base_t *socket_) +{ + zmq_assert (false); +} + void zmq::object_t::process_seqnum () { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 0fd6f47..cee82c8 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -49,6 +49,7 @@ namespace zmq int register_endpoint (const char *addr_, struct endpoint_t &endpoint_); void unregister_endpoints (class socket_base_t *socket_); struct endpoint_t find_endpoint (const char *addr_); + void destroy_socket (class socket_base_t *socket_); // Logs an message. void log (zmq_msg_t *msg_); @@ -56,10 +57,6 @@ namespace zmq // Chooses least loaded I/O thread. class io_thread_t *choose_io_thread (uint64_t affinity_); - // Zombify particular socket. In other words, pass the ownership to - // the context. - void zombify_socket (class socket_base_t *socket_); - // Derived object can use these functions to send commands // to other objects. void send_stop (); @@ -82,6 +79,8 @@ namespace zmq class own_t *object_); void send_term (class own_t *destination_, int linger_); void send_term_ack (class own_t *destination_); + void send_reap (class socket_base_t *socket_); + void send_done (); // These handlers can be overloaded by the derived objects. They are // called when command arrives from another thread. @@ -99,6 +98,7 @@ namespace zmq virtual void process_term_req (class own_t *object_); virtual void process_term (int linger_); virtual void process_term_ack (); + virtual void process_reap (class socket_base_t *socket_); // Special handler called after a command that requires a seqnum // was processed. The implementation should catch up with its counter diff --git a/src/reaper.cpp b/src/reaper.cpp new file mode 100644 index 0000000..f114f56 --- /dev/null +++ b/src/reaper.cpp @@ -0,0 +1,139 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "reaper.hpp" +#include "socket_base.hpp" +#include "err.hpp" + +zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : + object_t (ctx_, tid_), + terminating (false), + has_timer (false) +{ + poller = new (std::nothrow) poller_t; + zmq_assert (poller); + + mailbox_handle = poller->add_fd (mailbox.get_fd (), this); + poller->set_pollin (mailbox_handle); +} + +zmq::reaper_t::~reaper_t () +{ + delete poller; +} + +zmq::mailbox_t *zmq::reaper_t::get_mailbox () +{ + return &mailbox; +} + +void zmq::reaper_t::start () +{ + // Start the thread. + poller->start (); +} + +void zmq::reaper_t::stop () +{ + send_stop (); +} + +void zmq::reaper_t::in_event () +{ + while (true) { + + // Get the next command. If there is none, exit. + command_t cmd; + int rc = mailbox.recv (&cmd, false); + if (rc != 0 && errno == EINTR) + continue; + if (rc != 0 && errno == EAGAIN) + break; + errno_assert (rc == 0); + + // Process the command. + cmd.destination->process_command (cmd); + } +} + +void zmq::reaper_t::out_event () +{ + // We are never polling for POLLOUT here. This function is never called. + zmq_assert (false); +} + +void zmq::reaper_t::timer_event (int id_) +{ + zmq_assert (has_timer); + has_timer = false; + reap (); +} + +void zmq::reaper_t::reap () +{ + // Try to reap each socket in the list. + for (sockets_t::iterator it = sockets.begin (); it != sockets.end ();) { + if ((*it)->reap ()) { + + // MSVC version of STL requires this to be done a spacial way... +#if defined _MSC_VER + it = sockets.erase (it); +#else + sockets.erase (it); +#endif + } + else + ++it; + } + + // If there are still sockets to reap, wait a while, then try again. + if (!sockets.empty () && !has_timer) { + poller->add_timer (1 , this, 0); + has_timer = true; + return; + } + + // No more sockets and the context is already shutting down. + if (terminating) { + send_done (); + poller->rm_fd (mailbox_handle); + poller->stop (); + return; + } +} + +void zmq::reaper_t::process_stop () +{ + terminating = true; + + if (sockets.empty ()) { + send_done (); + poller->rm_fd (mailbox_handle); + poller->stop (); + } +} + +void zmq::reaper_t::process_reap (socket_base_t *socket_) +{ + // Start termination of associated I/O object hierarchy. + socket_->terminate (); + sockets.push_back (socket_); + reap (); +} + diff --git a/src/reaper.hpp b/src/reaper.hpp new file mode 100644 index 0000000..8598cd9 --- /dev/null +++ b/src/reaper.hpp @@ -0,0 +1,81 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_REAPER_HPP_INCLUDED__ +#define __ZMQ_REAPER_HPP_INCLUDED__ + +#include "object.hpp" +#include "mailbox.hpp" +#include "poller.hpp" +#include "i_poll_events.hpp" + +namespace zmq +{ + + class reaper_t : public object_t, public i_poll_events + { + public: + + reaper_t (class ctx_t *ctx_, uint32_t tid_); + ~reaper_t (); + + mailbox_t *get_mailbox (); + + void start (); + void stop (); + + // i_poll_events implementation. + void in_event (); + void out_event (); + void timer_event (int id_); + + private: + + void reap (); + + // Command handlers. + void process_stop (); + void process_reap (class socket_base_t *socket_); + + // List of all sockets being terminated. + typedef std::vector sockets_t; + sockets_t sockets; + + // Reaper thread accesses incoming commands via this mailbox. + mailbox_t mailbox; + + // Handle associated with mailbox' file descriptor. + poller_t::handle_t mailbox_handle; + + // I/O multiplexing is performed using a poller object. + poller_t *poller; + + // If true, we were already asked to terminate. + bool terminating; + + // If true, timer till next reaping is running. + bool has_timer; + + reaper_t (const reaper_t&); + const reaper_t &operator = (const reaper_t&); + }; + +} + +#endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index f19187f..0643d4d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -570,13 +570,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { - // Start termination of associated I/O object hierarchy. - terminate (); - - // Ask context to zombify this socket. In other words, transfer - // the ownership of the socket from this application thread - // to the context which will take care of the rest of shutdown process. - zombify_socket (this); + // Transfer the ownership of the socket from this application thread + // to the reaper thread which will take care of the rest of shutdown + // process. + send_reap (this); return 0; } @@ -627,7 +624,7 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) return session; } -bool zmq::socket_base_t::dezombify () +bool zmq::socket_base_t::reap () { // Process any commands from other threads/sockets that may be available // at the moment. Ultimately, socket will be destroyed. @@ -635,6 +632,11 @@ bool zmq::socket_base_t::dezombify () // If the object was already marked as destroyed, finish the deallocation. if (destroyed) { + + // Remove the socket from the context. + destroy_socket (this); + + // Deallocate. own_t::process_destroy (); return true; } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 057f8b1..a74b7d0 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -42,6 +42,8 @@ namespace zmq public own_t, public array_item_t { + friend class reaper_t; + public: // Create a socket of a specified type. @@ -82,9 +84,9 @@ namespace zmq void activated (class writer_t *pipe_); void terminated (class writer_t *pipe_); - // This function should be called only on zombie sockets. It tries - // to deallocate the zombie. Returns true is object is destroyed. - bool dezombify (); + // This function should be called only on sockets that are already + // closed -- from the reaper thread. It tries to finalise the socket. + bool reap (); protected: -- cgit v1.2.3