diff options
Diffstat (limited to 'src/ctx.hpp')
-rw-r--r-- | src/ctx.hpp | 43 |
1 files changed, 21 insertions, 22 deletions
diff --git a/src/ctx.hpp b/src/ctx.hpp index 92684e6..c07711e 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -64,11 +64,9 @@ namespace zmq // after the last one is closed. int terminate (); - // Create a socket. + // Create and destroy a socket. class socket_base_t *create_socket (int type_); - - // Make socket a zombie. - void zombify_socket (socket_base_t *socket_); + void destroy_socket (class socket_base_t *socket_); // Send command to the destination thread. void send_command (uint32_t tid_, const command_t &command_); @@ -78,6 +76,9 @@ namespace zmq // Returns NULL is no I/O thread is available. class io_thread_t *choose_io_thread (uint64_t affinity_); + // Returns reaper thread object. + class object_t *get_reaper (); + // Management of inproc endpoints. int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (class socket_base_t *socket_); @@ -86,41 +87,36 @@ namespace zmq // Logging. void log (zmq_msg_t *msg_); + enum { + term_tid = 0, + reaper_tid = 1 + }; + private: ~ctx_t (); - // Sockets belonging to this context. + // Sockets belonging to this context. We need the list so that + // we can notify the sockets when zmq_term() is called. The sockets + // will return ETERM then. typedef array_t <socket_base_t> sockets_t; sockets_t sockets; - // List of sockets that were already closed but not yet deallocated. - // These sockets still have some pipes and I/O objects attached. - typedef std::vector <socket_base_t*> zombies_t; - zombies_t zombies; - // List of unused thread slots. typedef std::vector <uint32_t> emtpy_slots_t; emtpy_slots_t empty_slots; - // If true, shutdown thread wants to be informed when there are no - // more open sockets. Do so by posting no_sockets_sync semaphore. - // Note that this variable is synchronised by slot_sync mutex. - bool no_sockets_notify; - - // Object used by zmq_term to wait while all the sockets are closed - // by different application threads. - semaphore_t no_sockets_sync; + // If true, zmq_term was already called. + bool terminating; // Synchronisation of accesses to global slot-related data: - // sockets, zombies, empty_slots, terminated. It also synchronises + // sockets, empty_slots, terminating. It also synchronises // access to zombie sockets as such (as oposed to slots) and provides // a memory barrier to ensure that all CPU cores see the same data. mutex_t slot_sync; - // This function attempts to deallocate as many zombie sockets as - // possible. It must be called within a slot_sync critical section. - void dezombify (); + // The reaper thread. + class reaper_t *reaper; // I/O threads. typedef std::vector <class io_thread_t*> io_threads_t; @@ -130,6 +126,9 @@ namespace zmq uint32_t slot_count; mailbox_t **slots; + // Mailbox for zmq_term thread. + mailbox_t term_mailbox; + // List of inproc endpoints within this context. typedef std::map <std::string, endpoint_t> endpoints_t; endpoints_t endpoints; |