From 80ac398bba928fa7f245d2e107071677a13185cf Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 9 Feb 2011 15:32:15 +0100 Subject: Initial implementation of reaper thread. Reaper thread destroys the socket asynchronously. zmq_term() can be interrupted by a signal (EINTR). zmq_socket() will return ETERM after zmq_term() was called. Signed-off-by: Martin Sustrik --- src/ctx.hpp | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) (limited to 'src/ctx.hpp') diff --git a/src/ctx.hpp b/src/ctx.hpp index 92684e6..c07711e 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -64,11 +64,9 @@ namespace zmq // after the last one is closed. int terminate (); - // Create a socket. + // Create and destroy a socket. class socket_base_t *create_socket (int type_); - - // Make socket a zombie. - void zombify_socket (socket_base_t *socket_); + void destroy_socket (class socket_base_t *socket_); // Send command to the destination thread. void send_command (uint32_t tid_, const command_t &command_); @@ -78,6 +76,9 @@ namespace zmq // Returns NULL is no I/O thread is available. class io_thread_t *choose_io_thread (uint64_t affinity_); + // Returns reaper thread object. + class object_t *get_reaper (); + // Management of inproc endpoints. int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (class socket_base_t *socket_); @@ -86,41 +87,36 @@ namespace zmq // Logging. void log (zmq_msg_t *msg_); + enum { + term_tid = 0, + reaper_tid = 1 + }; + private: ~ctx_t (); - // Sockets belonging to this context. + // Sockets belonging to this context. We need the list so that + // we can notify the sockets when zmq_term() is called. The sockets + // will return ETERM then. typedef array_t sockets_t; sockets_t sockets; - // List of sockets that were already closed but not yet deallocated. - // These sockets still have some pipes and I/O objects attached. - typedef std::vector zombies_t; - zombies_t zombies; - // List of unused thread slots. typedef std::vector emtpy_slots_t; emtpy_slots_t empty_slots; - // If true, shutdown thread wants to be informed when there are no - // more open sockets. Do so by posting no_sockets_sync semaphore. - // Note that this variable is synchronised by slot_sync mutex. - bool no_sockets_notify; - - // Object used by zmq_term to wait while all the sockets are closed - // by different application threads. - semaphore_t no_sockets_sync; + // If true, zmq_term was already called. + bool terminating; // Synchronisation of accesses to global slot-related data: - // sockets, zombies, empty_slots, terminated. It also synchronises + // sockets, empty_slots, terminating. It also synchronises // access to zombie sockets as such (as oposed to slots) and provides // a memory barrier to ensure that all CPU cores see the same data. mutex_t slot_sync; - // This function attempts to deallocate as many zombie sockets as - // possible. It must be called within a slot_sync critical section. - void dezombify (); + // The reaper thread. + class reaper_t *reaper; // I/O threads. typedef std::vector io_threads_t; @@ -130,6 +126,9 @@ namespace zmq uint32_t slot_count; mailbox_t **slots; + // Mailbox for zmq_term thread. + mailbox_t term_mailbox; + // List of inproc endpoints within this context. typedef std::map endpoints_t; endpoints_t endpoints; -- cgit v1.2.3