From b8b4acef4c2ba1a169ce84c1fb4c70a5676ebba3 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 6 Aug 2009 10:47:34 +0200 Subject: dispatcher renamed to context --- src/context.hpp | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 src/context.hpp (limited to 'src/context.hpp') diff --git a/src/context.hpp b/src/context.hpp new file mode 100644 index 0000000..7701ef7 --- /dev/null +++ b/src/context.hpp @@ -0,0 +1,170 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_CONTEXT_HPP_INCLUDED__ +#define __ZMQ_CONTEXT_HPP_INCLUDED__ + +#include +#include +#include + +#include "i_signaler.hpp" +#include "ypipe.hpp" +#include "command.hpp" +#include "config.hpp" +#include "mutex.hpp" +#include "stdint.hpp" + +namespace zmq +{ + + // Dispatcher implements bidirectional thread-safe passing of commands + // between N threads. It consists of a ypipes to pass commands and + // signalers to wake up the receiver thread when new commands are + // available. Note that context is inefficient for passing messages + // within a thread (sender thread = receiver thread). The optimisation is + // not part of the class and should be implemented by individual threads + // (presumably by calling the command handling function directly). + + class context_t + { + public: + + // Create the context object. Matrix of pipes to communicate between + // each socket and each I/O thread is created along with appropriate + // signalers. + context_t (int app_threads_, int io_threads_); + + // To be called to terminate the whole infrastructure (zmq_term). + void shutdown (); + + // Create a socket engine. + struct i_api *create_socket (int type_); + + // Returns number of thread slots in the context. To be used by + // individual threads to find out how many distinct signals can be + // received. + int thread_slot_count (); + + // Send command from the source to the destination. + inline void write (int source_, int destination_, + const command_t &command_) + { + command_pipe_t &pipe = + command_pipes [source_ * signalers.size () + destination_]; + pipe.write (command_); + if (!pipe.flush ()) + signalers [destination_]->signal (source_); + } + + // Receive command from the source. Returns false if there is no + // command available. + inline bool read (int source_, int destination_, command_t *command_) + { + return command_pipes [source_ * signalers.size () + + destination_].read (command_); + } + + // Creates new pipe. + void create_pipe (class object_t *reader_parent_, + class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, + class pipe_reader_t **reader_, class pipe_writer_t **writer_); + + // Deallocates the pipe. + void destroy_pipe (class pipe_t *pipe_); + + // Registers existing session object as an inproc endpoint. + int register_inproc_endpoint (const char *endpoint_, + class session_t *session_); + + // Retrieves an inproc endpoint. Increments the command sequence number + // of the object by one. Caller is thus bound to send the command + // to the connection after invoking this function. Returns NULL if + // the endpoint doesn't exist. + class object_t *get_inproc_endpoint (const char *endpoint_); + + // Removes all the inproc endpoints associated with the given session + // object from the global repository. + void unregister_inproc_endpoints (class session_t *session_); + + // Returns the I/O thread that is the least busy at the moment. + // Taskset specifies which I/O threads are eligible (0 = all). + class io_thread_t *choose_io_thread (uint64_t taskset_); + + private: + + // Clean-up. + ~context_t (); + + // Returns the app thread associated with the current thread. + // NULL if we are out of app thread slots. + class app_thread_t *choose_app_thread (); + + // Application threads. + typedef std::vector app_threads_t; + app_threads_t app_threads; + + // I/O threads. + typedef std::vector io_threads_t; + io_threads_t io_threads; + + // Signalers for both application and I/O threads. + std::vector signalers; + + // Pipe to hold the commands. + typedef ypipe_t command_pipe_t; + + // NxN matrix of command pipes. + command_pipe_t *command_pipes; + + // Synchronisation of accesses to shared thread data. + mutex_t threads_sync; + + // Global repository of pipes. It's used only on terminal shutdown + // to deallocate all the pipes irrespective of whether they are + // referenced from pipe_reader, pipe_writer or both. + struct pipe_info_t + { + class pipe_t *pipe; + class pipe_reader_t *reader; + class pipe_writer_t *writer; + }; + typedef std::vector pipes_t; + pipes_t pipes; + + // Synchronisation of access to global repository of pipes. + mutex_t pipes_sync; + + // Global repository of available inproc endpoints. + typedef std::map inproc_endpoints_t; + inproc_endpoints_t inproc_endpoints; + + // Synchronisation of access to the global repository + // of inproc endpoints. + mutex_t inproc_endpoint_sync; + + context_t (const context_t&); + void operator = (const context_t&); + }; + +} + +#endif + -- cgit v1.2.3