diff options
Diffstat (limited to 'src/ctx.hpp')
-rw-r--r-- | src/ctx.hpp | 141 |
1 files changed, 69 insertions, 72 deletions
diff --git a/src/ctx.hpp b/src/ctx.hpp index c96a923..c6ea4ce 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -1,39 +1,53 @@ /* - Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #ifndef __ZMQ_CTX_HPP_INCLUDED__ #define __ZMQ_CTX_HPP_INCLUDED__ -#include <vector> -#include <set> #include <map> +#include <vector> #include <string> +#include <stdarg.h> + +#include "../include/zmq.h" -#include "signaler.hpp" +#include "mailbox.hpp" +#include "semaphore.hpp" #include "ypipe.hpp" +#include "array.hpp" #include "config.hpp" #include "mutex.hpp" #include "stdint.hpp" #include "thread.hpp" +#include "options.hpp" namespace zmq { + // Information associated with inproc endpoint. Note that endpoint options + // are registered as well so that the peer can access them without a need + // for synchronisation, handshaking or similar. + struct endpoint_t + { + class socket_base_t *socket; + options_t options; + }; // Context object encapsulates all the global state associated with // the library. @@ -50,104 +64,87 @@ namespace zmq // no more sockets open it'll cause all the infrastructure to be shut // down. If there are open sockets still, the deallocation happens // after the last one is closed. - int term (); + int terminate (); - // Create a socket. + // Create and destroy a socket. class socket_base_t *create_socket (int type_); - - // Destroy a socket. - void destroy_socket (); - - // Called by app_thread_t when it has no more sockets. The function - // should disassociate the object from the current OS thread. - void no_sockets (class app_thread_t *thread_); + void destroy_socket (class socket_base_t *socket_); // Send command to the destination thread. - void send_command (uint32_t destination_, const command_t &command_); - - // Receive command from another thread. - bool recv_command (uint32_t thread_slot_, command_t *command_, - bool block_); + void send_command (uint32_t tid_, const command_t &command_); // Returns the I/O thread that is the least busy at the moment. - // Taskset specifies which I/O threads are eligible (0 = all). - class io_thread_t *choose_io_thread (uint64_t taskset_); + // Affinity specifies which I/O threads are eligible (0 = all). + // Returns NULL is no I/O thread is available. + class io_thread_t *choose_io_thread (uint64_t affinity_); - // All pipes are registered with the context so that even the - // orphaned pipes can be deallocated on the terminal shutdown. - void register_pipe (class pipe_t *pipe_); - void unregister_pipe (class pipe_t *pipe_); + // Returns reaper thread object. + class object_t *get_reaper (); // Management of inproc endpoints. - int register_endpoint (const char *addr_, class socket_base_t *socket_); + int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (class socket_base_t *socket_); - class socket_base_t *find_endpoint (const char *addr_); + endpoint_t find_endpoint (const char *addr_); + + // Logging. + void log (const char *format_, va_list args_); + + enum { + term_tid = 0, + reaper_tid = 1 + }; private: ~ctx_t (); - struct app_thread_info_t - { - // If false, 0MQ application thread is free, there's no associated - // OS thread. - bool associated; + // Sockets belonging to this context. We need the list so that + // we can notify the sockets when zmq_term() is called. The sockets + // will return ETERM then. + typedef array_t <socket_base_t> sockets_t; + sockets_t sockets; - // ID of the associated OS thread. If 'associated' is false, - // this field contains bogus data. - thread_t::id_t tid; + // List of unused thread slots. + typedef std::vector <uint32_t> emtpy_slots_t; + emtpy_slots_t empty_slots; - // Pointer to the 0MQ application thread object. - class app_thread_t *app_thread; - }; + // If true, zmq_term was already called. + bool terminating; - // Application threads. - typedef std::vector <app_thread_info_t> app_threads_t; - app_threads_t app_threads; + // Synchronisation of accesses to global slot-related data: + // sockets, empty_slots, terminating. It also synchronises + // access to zombie sockets as such (as oposed to slots) and provides + // a memory barrier to ensure that all CPU cores see the same data. + mutex_t slot_sync; - // Synchronisation of accesses to shared application thread data. - mutex_t app_threads_sync; + // The reaper thread. + class reaper_t *reaper; // I/O threads. typedef std::vector <class io_thread_t*> io_threads_t; io_threads_t io_threads; - // Array of pointers to signalers for both application and I/O threads. - int signalers_count; - signaler_t **signalers; - - // As pipes may reside in orphaned state in particular moments - // of the pipe shutdown process, i.e. neither pipe reader nor - // pipe writer hold reference to the pipe, we have to hold references - // to all pipes in context so that we can deallocate them - // during terminal shutdown even though it conincides with the - // pipe being in the orphaned state. - typedef std::set <class pipe_t*> pipes_t; - pipes_t pipes; + // Array of pointers to mailboxes for both application and I/O threads. + uint32_t slot_count; + mailbox_t **slots; - // Synchronisation of access to the pipes repository. - mutex_t pipes_sync; - - // Number of sockets alive. - int sockets; - - // If true, zmq_term was already called. When last socket is closed - // the whole 0MQ infrastructure should be deallocated. - bool terminated; - - // Synchronisation of access to the termination data (socket count - // and 'terminated' flag). - mutex_t term_sync; + // Mailbox for zmq_term thread. + mailbox_t term_mailbox; // List of inproc endpoints within this context. - typedef std::map <std::string, class socket_base_t*> endpoints_t; + typedef std::map <std::string, endpoint_t> endpoints_t; endpoints_t endpoints; // Synchronisation of access to the list of inproc endpoints. mutex_t endpoints_sync; + // PUB socket for logging. The socket is shared among all the threads, + // thus it is synchronised by a mutex. + class socket_base_t *log_socket; + mutex_t log_sync; + ctx_t (const ctx_t&); - void operator = (const ctx_t&); + const ctx_t &operator = (const ctx_t&); }; } |