From 05d908492dc382941fc633ad7082b5bd86e84e67 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 6 Aug 2010 17:49:37 +0200 Subject: WIP: Socket migration between threads, new zmq_close() semantics Sockets may now be migrated between OS threads; sockets may not be used by more than one thread at any time. To migrate a socket to another thread the caller must ensure that a full memory barrier is called before using the socket from the target thread. The new zmq_close() semantics implement the behaviour discussed at: http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html Specifically, zmq_close() is now deterministic and while it still returns immediately, it does not discard any data that may still be queued for sending. Further, zmq_term() will now block until all outstanding data has been sent. TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or an equivalent mechanism (possibly a configurable timeout to zmq_term()) needs to be implemented. --- src/ctx.hpp | 94 ++++++++++++++++++++++++------------------------------------- 1 file changed, 37 insertions(+), 57 deletions(-) (limited to 'src/ctx.hpp') diff --git a/src/ctx.hpp b/src/ctx.hpp index c96a923..cb9a2d9 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -26,7 +26,9 @@ #include #include "signaler.hpp" +#include "semaphore.hpp" #include "ypipe.hpp" +#include "yarray.hpp" #include "config.hpp" #include "mutex.hpp" #include "stdint.hpp" @@ -55,29 +57,19 @@ namespace zmq // Create a socket. class socket_base_t *create_socket (int type_); - // Destroy a socket. - void destroy_socket (); + // Make socket a zombie. + void zombify (socket_base_t *socket_); - // Called by app_thread_t when it has no more sockets. The function - // should disassociate the object from the current OS thread. - void no_sockets (class app_thread_t *thread_); + // Send command to the destination slot. + void send_command (uint32_t slot_, const command_t &command_); - // Send command to the destination thread. - void send_command (uint32_t destination_, const command_t &command_); - - // Receive command from another thread. - bool recv_command (uint32_t thread_slot_, command_t *command_, - bool block_); + // Receive command from the source slot. + bool recv_command (uint32_t slot_, command_t *command_, bool block_); // Returns the I/O thread that is the least busy at the moment. // Taskset specifies which I/O threads are eligible (0 = all). class io_thread_t *choose_io_thread (uint64_t taskset_); - // All pipes are registered with the context so that even the - // orphaned pipes can be deallocated on the terminal shutdown. - void register_pipe (class pipe_t *pipe_); - void unregister_pipe (class pipe_t *pipe_); - // Management of inproc endpoints. int register_endpoint (const char *addr_, class socket_base_t *socket_); void unregister_endpoints (class socket_base_t *socket_); @@ -87,57 +79,45 @@ namespace zmq ~ctx_t (); - struct app_thread_info_t - { - // If false, 0MQ application thread is free, there's no associated - // OS thread. - bool associated; + // Sockets belonging to this context. + typedef yarray_t sockets_t; + sockets_t sockets; + + // Array of sockets that were already closed but not yet deallocated. + // These sockets still have some pipes and I/O objects attached. + typedef yarray_t zombies_t; + zombies_t zombies; + + // List of unused slots. + typedef std::vector emtpy_slots_t; + emtpy_slots_t empty_slots; - // ID of the associated OS thread. If 'associated' is false, - // this field contains bogus data. - thread_t::id_t tid; + // If true, shutdown thread wants to be informed when there are no + // more open sockets. Do so by posting no_sockets_sync semaphore. + // Note that this variable is synchronised by slot_sync mutex. + bool no_sockets_notify; - // Pointer to the 0MQ application thread object. - class app_thread_t *app_thread; - }; + // Object used by zmq_term to wait while all the sockets are closed + // by different application threads. + semaphore_t no_sockets_sync; - // Application threads. - typedef std::vector app_threads_t; - app_threads_t app_threads; + // Synchronisation of accesses to global slot-related data: + // sockets, zombies, empty_slots, terminated. It also synchronises + // access to zombie sockets as such (as oposed to slots) and provides + // a memory barrier to ensure that all CPU cores see the same data. + mutex_t slot_sync; - // Synchronisation of accesses to shared application thread data. - mutex_t app_threads_sync; + // This function attempts to deallocate as many zombie sockets as + // possible. It must be called within a slot_sync critical section. + void dezombify (); // I/O threads. typedef std::vector io_threads_t; io_threads_t io_threads; // Array of pointers to signalers for both application and I/O threads. - int signalers_count; - signaler_t **signalers; - - // As pipes may reside in orphaned state in particular moments - // of the pipe shutdown process, i.e. neither pipe reader nor - // pipe writer hold reference to the pipe, we have to hold references - // to all pipes in context so that we can deallocate them - // during terminal shutdown even though it conincides with the - // pipe being in the orphaned state. - typedef std::set pipes_t; - pipes_t pipes; - - // Synchronisation of access to the pipes repository. - mutex_t pipes_sync; - - // Number of sockets alive. - int sockets; - - // If true, zmq_term was already called. When last socket is closed - // the whole 0MQ infrastructure should be deallocated. - bool terminated; - - // Synchronisation of access to the termination data (socket count - // and 'terminated' flag). - mutex_t term_sync; + uint32_t slot_count; + signaler_t **slots; // List of inproc endpoints within this context. typedef std::map endpoints_t; -- cgit v1.2.3