From b8b4acef4c2ba1a169ce84c1fb4c70a5676ebba3 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 6 Aug 2009 10:47:34 +0200 Subject: dispatcher renamed to context --- src/Makefile.am | 4 +- src/app_thread.cpp | 8 +- src/app_thread.hpp | 4 +- src/context.cpp | 266 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/context.hpp | 170 +++++++++++++++++++++++++++++++++ src/dispatcher.cpp | 266 ---------------------------------------------------- src/dispatcher.hpp | 170 --------------------------------- src/io_thread.cpp | 8 +- src/io_thread.hpp | 2 +- src/object.cpp | 26 ++--- src/object.hpp | 6 +- src/pipe.hpp | 8 +- src/pipe_reader.cpp | 2 +- src/pipe_reader.hpp | 6 +- src/pipe_writer.hpp | 6 +- src/safe_object.cpp | 6 +- src/safe_object.hpp | 2 +- src/zmq.cpp | 15 ++- 18 files changed, 487 insertions(+), 488 deletions(-) create mode 100644 src/context.cpp create mode 100644 src/context.hpp delete mode 100644 src/dispatcher.cpp delete mode 100644 src/dispatcher.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index e6d09ca..27f4412 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -8,10 +8,10 @@ libzmq_la_SOURCES = \ command.hpp \ config.hpp \ connecter.hpp \ + context.hpp \ data_distributor.hpp \ decoder.hpp \ devpoll.hpp \ - dispatcher.hpp \ dummy_aggregator.hpp \ dummy_distributor.hpp \ encoder.hpp \ @@ -70,9 +70,9 @@ libzmq_la_SOURCES = \ zmq_tcp_engine.hpp \ app_thread.cpp \ connecter.cpp \ + context.cpp \ data_distributor.cpp \ devpoll.hpp \ - dispatcher.cpp \ dummy_aggregator.cpp \ dummy_distributor.cpp \ epoll.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 2406dbd..9cc61c7 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -26,7 +26,7 @@ #endif #include "app_thread.hpp" -#include "dispatcher.hpp" +#include "context.hpp" #include "err.hpp" #include "session.hpp" #include "pipe.hpp" @@ -51,8 +51,8 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : - object_t (dispatcher_, thread_slot_), +zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : + object_t (context_, thread_slot_), tid (0), last_processing_time (0) { @@ -213,7 +213,7 @@ void zmq::app_thread_t::process_commands (bool block_) for (int i = 0; i != thread_slot_count (); i++) { if (signals & (ypollset_t::signals_t (1) << i)) { command_t cmd; - while (dispatcher->read (i, get_thread_slot (), &cmd)) + while (context->read (i, get_thread_slot (), &cmd)) cmd.destination->process_command (cmd); } } diff --git a/src/app_thread.hpp b/src/app_thread.hpp index ffe5596..8295c2f 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -34,7 +34,7 @@ namespace zmq { public: - app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); + app_thread_t (class context_t *context_, int thread_slot_); // To be called when the whole infrastrucure is being closed. void shutdown (); @@ -47,7 +47,7 @@ namespace zmq struct i_api *create_socket (int type_); // Nota bene: The following two functions are accessed from different - // threads. The caller (dispatcher) is responsible for synchronisation + // threads. The caller (context) is responsible for synchronisation // of accesses. // Returns true is current thread is associated with the app thread. diff --git a/src/context.cpp b/src/context.cpp new file mode 100644 index 0000000..ab4643e --- /dev/null +++ b/src/context.cpp @@ -0,0 +1,266 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" + +#include "context.hpp" +#include "app_thread.hpp" +#include "io_thread.hpp" +#include "platform.hpp" +#include "err.hpp" +#include "pipe.hpp" +#include "pipe_reader.hpp" +#include "pipe_writer.hpp" +#include "session.hpp" +#include "i_api.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.h" +#endif + +zmq::context_t::context_t (int app_threads_, int io_threads_) +{ +#ifdef ZMQ_HAVE_WINDOWS + // Intialise Windows sockets. Note that WSAStartup can be called multiple + // times given that WSACleanup will be called for each WSAStartup. + WORD version_requested = MAKEWORD (2, 2); + WSADATA wsa_data; + int rc = WSAStartup (version_requested, &wsa_data); + zmq_assert (rc == 0); + zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && + HIBYTE (wsa_data.wVersion) == 2); +#endif + + // Create application thread proxies. + for (int i = 0; i != app_threads_; i++) { + app_thread_t *app_thread = new app_thread_t (this, i); + zmq_assert (app_thread); + app_threads.push_back (app_thread); + signalers.push_back (app_thread->get_signaler ()); + } + + // Create I/O thread objects. + for (int i = 0; i != io_threads_; i++) { + io_thread_t *io_thread = new io_thread_t (this, i + app_threads_); + zmq_assert (io_thread); + io_threads.push_back (io_thread); + signalers.push_back (io_thread->get_signaler ()); + } + + // Create command pipe matrix. + command_pipes = new command_pipe_t [signalers.size () * signalers.size ()]; + zmq_assert (command_pipes); + + // Launch I/O threads. + for (int i = 0; i != io_threads_; i++) + io_threads [i]->start (); +} + +void zmq::context_t::shutdown () +{ + delete this; +} + +zmq::context_t::~context_t () +{ + // Ask I/O threads to terminate. + for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) + io_threads [i]->stop (); + + // Wait till I/O threads actually terminate. + for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) + io_threads [i]->join (); + + // At this point the current thread is the only thread with access to + // our internal data. Deallocation will be done exclusively in this thread. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + app_threads [i]->shutdown (); + for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) + io_threads [i]->shutdown (); + + delete [] command_pipes; + + // Deallocate all the pipes, pipe readers and pipe writers. + for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++) { + delete it->pipe; + delete it->reader; + delete it->writer; + } + +#ifdef ZMQ_HAVE_WINDOWS + // On Windows, uninitialise socket layer. + int rc = WSACleanup (); + wsa_assert (rc != SOCKET_ERROR); +#endif +} + +int zmq::context_t::thread_slot_count () +{ + return signalers.size (); +} + +zmq::i_api *zmq::context_t::create_socket (int type_) +{ + threads_sync.lock (); + app_thread_t *thread = choose_app_thread (); + if (!thread) { + threads_sync.unlock (); + return NULL; + } + i_api *s = thread->create_socket (type_); + threads_sync.unlock (); + return s; +} + +zmq::app_thread_t *zmq::context_t::choose_app_thread () +{ + // Check whether thread ID is already assigned. If so, return it. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + if (app_threads [i]->is_current ()) + return app_threads [i]; + + // Check whether there's an unused thread slot in the cotext. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + if (app_threads [i]->make_current ()) + return app_threads [i]; + + // Thread limit was exceeded. + errno = EMFILE; + return NULL; +} + +zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_) +{ + zmq_assert (io_threads.size () > 0); + + // Find the I/O thread with minimum load. + int min_load = io_threads [0]->get_load (); + io_threads_t::size_type result = 0; + for (io_threads_t::size_type i = 1; i != io_threads.size (); i++) { + if (!taskset_ || (taskset_ & (uint64_t (1) << i))) { + int load = io_threads [i]->get_load (); + if (load < min_load) { + min_load = load; + result = i; + } + } + } + + return io_threads [result]; +} + +void zmq::context_t::create_pipe (object_t *reader_parent_, + object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, + pipe_reader_t **reader_, pipe_writer_t **writer_) +{ + // Create the pipe, reader & writer triple. + pipe_t *pipe = new pipe_t; + zmq_assert (pipe); + pipe_reader_t *reader = new pipe_reader_t (reader_parent_, pipe, + hwm_, lwm_); + zmq_assert (reader); + pipe_writer_t *writer = new pipe_writer_t (writer_parent_, pipe, reader, + hwm_, lwm_); + zmq_assert (writer); + reader->set_peer (writer); + + // Store the pipe in the repository. + pipe_info_t info = {pipe, reader, writer}; + pipes_sync.lock (); + pipe->set_index (pipes.size ()); + pipes.push_back (info); + pipes_sync.unlock (); + + *reader_ = reader; + *writer_ = writer; +} + +void zmq::context_t::destroy_pipe (pipe_t *pipe_) +{ + // Remove the pipe from the repository. + pipe_info_t info; + pipes_sync.lock (); + pipes_t::size_type i = pipe_->get_index (); + info = pipes [i]; + pipes [i] = pipes.back (); + pipes.pop_back (); + pipes_sync.unlock (); + + // Deallocate the pipe and associated pipe reader & pipe writer. + zmq_assert (info.pipe == pipe_); + delete info.pipe; + delete info.reader; + delete info.writer; +} + +int zmq::context_t::register_inproc_endpoint (const char *endpoint_, + session_t *session_) +{ + inproc_endpoint_sync.lock (); + inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); + + if (it != inproc_endpoints.end ()) { + inproc_endpoint_sync.unlock (); + errno = EADDRINUSE; + return -1; + } + + inproc_endpoints.insert (std::make_pair (endpoint_, session_)); + + inproc_endpoint_sync.unlock (); + return 0; +} + +zmq::object_t *zmq::context_t::get_inproc_endpoint (const char *endpoint_) +{ + inproc_endpoint_sync.lock (); + inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); + + if (it == inproc_endpoints.end ()) { + inproc_endpoint_sync.unlock (); + errno = EADDRNOTAVAIL; + return NULL; + } + + it->second->inc_seqnum (); + object_t *session = it->second; + + inproc_endpoint_sync.unlock (); + return session; +} + +void zmq::context_t::unregister_inproc_endpoints (session_t *session_) +{ + inproc_endpoint_sync.lock (); + + // Remove the connection from the repository. + // TODO: Yes, the algorithm has O(n^2) complexity. Should be O(log n). + for (inproc_endpoints_t::iterator it = inproc_endpoints.begin (); + it != inproc_endpoints.end ();) { + if (it->second == session_) { + inproc_endpoints.erase (it); + it = inproc_endpoints.begin (); + } + else + it++; + } + + inproc_endpoint_sync.unlock (); +} + diff --git a/src/context.hpp b/src/context.hpp new file mode 100644 index 0000000..7701ef7 --- /dev/null +++ b/src/context.hpp @@ -0,0 +1,170 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_CONTEXT_HPP_INCLUDED__ +#define __ZMQ_CONTEXT_HPP_INCLUDED__ + +#include +#include +#include + +#include "i_signaler.hpp" +#include "ypipe.hpp" +#include "command.hpp" +#include "config.hpp" +#include "mutex.hpp" +#include "stdint.hpp" + +namespace zmq +{ + + // Dispatcher implements bidirectional thread-safe passing of commands + // between N threads. It consists of a ypipes to pass commands and + // signalers to wake up the receiver thread when new commands are + // available. Note that context is inefficient for passing messages + // within a thread (sender thread = receiver thread). The optimisation is + // not part of the class and should be implemented by individual threads + // (presumably by calling the command handling function directly). + + class context_t + { + public: + + // Create the context object. Matrix of pipes to communicate between + // each socket and each I/O thread is created along with appropriate + // signalers. + context_t (int app_threads_, int io_threads_); + + // To be called to terminate the whole infrastructure (zmq_term). + void shutdown (); + + // Create a socket engine. + struct i_api *create_socket (int type_); + + // Returns number of thread slots in the context. To be used by + // individual threads to find out how many distinct signals can be + // received. + int thread_slot_count (); + + // Send command from the source to the destination. + inline void write (int source_, int destination_, + const command_t &command_) + { + command_pipe_t &pipe = + command_pipes [source_ * signalers.size () + destination_]; + pipe.write (command_); + if (!pipe.flush ()) + signalers [destination_]->signal (source_); + } + + // Receive command from the source. Returns false if there is no + // command available. + inline bool read (int source_, int destination_, command_t *command_) + { + return command_pipes [source_ * signalers.size () + + destination_].read (command_); + } + + // Creates new pipe. + void create_pipe (class object_t *reader_parent_, + class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, + class pipe_reader_t **reader_, class pipe_writer_t **writer_); + + // Deallocates the pipe. + void destroy_pipe (class pipe_t *pipe_); + + // Registers existing session object as an inproc endpoint. + int register_inproc_endpoint (const char *endpoint_, + class session_t *session_); + + // Retrieves an inproc endpoint. Increments the command sequence number + // of the object by one. Caller is thus bound to send the command + // to the connection after invoking this function. Returns NULL if + // the endpoint doesn't exist. + class object_t *get_inproc_endpoint (const char *endpoint_); + + // Removes all the inproc endpoints associated with the given session + // object from the global repository. + void unregister_inproc_endpoints (class session_t *session_); + + // Returns the I/O thread that is the least busy at the moment. + // Taskset specifies which I/O threads are eligible (0 = all). + class io_thread_t *choose_io_thread (uint64_t taskset_); + + private: + + // Clean-up. + ~context_t (); + + // Returns the app thread associated with the current thread. + // NULL if we are out of app thread slots. + class app_thread_t *choose_app_thread (); + + // Application threads. + typedef std::vector app_threads_t; + app_threads_t app_threads; + + // I/O threads. + typedef std::vector io_threads_t; + io_threads_t io_threads; + + // Signalers for both application and I/O threads. + std::vector signalers; + + // Pipe to hold the commands. + typedef ypipe_t command_pipe_t; + + // NxN matrix of command pipes. + command_pipe_t *command_pipes; + + // Synchronisation of accesses to shared thread data. + mutex_t threads_sync; + + // Global repository of pipes. It's used only on terminal shutdown + // to deallocate all the pipes irrespective of whether they are + // referenced from pipe_reader, pipe_writer or both. + struct pipe_info_t + { + class pipe_t *pipe; + class pipe_reader_t *reader; + class pipe_writer_t *writer; + }; + typedef std::vector pipes_t; + pipes_t pipes; + + // Synchronisation of access to global repository of pipes. + mutex_t pipes_sync; + + // Global repository of available inproc endpoints. + typedef std::map inproc_endpoints_t; + inproc_endpoints_t inproc_endpoints; + + // Synchronisation of access to the global repository + // of inproc endpoints. + mutex_t inproc_endpoint_sync; + + context_t (const context_t&); + void operator = (const context_t&); + }; + +} + +#endif + diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp deleted file mode 100644 index 56a5e0b..0000000 --- a/src/dispatcher.cpp +++ /dev/null @@ -1,266 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "dispatcher.hpp" -#include "app_thread.hpp" -#include "io_thread.hpp" -#include "platform.hpp" -#include "err.hpp" -#include "pipe.hpp" -#include "pipe_reader.hpp" -#include "pipe_writer.hpp" -#include "session.hpp" -#include "i_api.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include "windows.h" -#endif - -zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) -{ -#ifdef ZMQ_HAVE_WINDOWS - // Intialise Windows sockets. Note that WSAStartup can be called multiple - // times given that WSACleanup will be called for each WSAStartup. - WORD version_requested = MAKEWORD (2, 2); - WSADATA wsa_data; - int rc = WSAStartup (version_requested, &wsa_data); - zmq_assert (rc == 0); - zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && - HIBYTE (wsa_data.wVersion) == 2); -#endif - - // Create application thread proxies. - for (int i = 0; i != app_threads_; i++) { - app_thread_t *app_thread = new app_thread_t (this, i); - zmq_assert (app_thread); - app_threads.push_back (app_thread); - signalers.push_back (app_thread->get_signaler ()); - } - - // Create I/O thread objects. - for (int i = 0; i != io_threads_; i++) { - io_thread_t *io_thread = new io_thread_t (this, i + app_threads_); - zmq_assert (io_thread); - io_threads.push_back (io_thread); - signalers.push_back (io_thread->get_signaler ()); - } - - // Create command pipe matrix. - command_pipes = new command_pipe_t [signalers.size () * signalers.size ()]; - zmq_assert (command_pipes); - - // Launch I/O threads. - for (int i = 0; i != io_threads_; i++) - io_threads [i]->start (); -} - -void zmq::dispatcher_t::shutdown () -{ - delete this; -} - -zmq::dispatcher_t::~dispatcher_t () -{ - // Ask I/O threads to terminate. - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - io_threads [i]->stop (); - - // Wait till I/O threads actually terminate. - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - io_threads [i]->join (); - - // At this point the current thread is the only thread with access to - // our internal data. Deallocation will be done exclusively in this thread. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - app_threads [i]->shutdown (); - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - io_threads [i]->shutdown (); - - delete [] command_pipes; - - // Deallocate all the pipes, pipe readers and pipe writers. - for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++) { - delete it->pipe; - delete it->reader; - delete it->writer; - } - -#ifdef ZMQ_HAVE_WINDOWS - // On Windows, uninitialise socket layer. - int rc = WSACleanup (); - wsa_assert (rc != SOCKET_ERROR); -#endif -} - -int zmq::dispatcher_t::thread_slot_count () -{ - return signalers.size (); -} - -zmq::i_api *zmq::dispatcher_t::create_socket (int type_) -{ - threads_sync.lock (); - app_thread_t *thread = choose_app_thread (); - if (!thread) { - threads_sync.unlock (); - return NULL; - } - i_api *s = thread->create_socket (type_); - threads_sync.unlock (); - return s; -} - -zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () -{ - // Check whether thread ID is already assigned. If so, return it. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - if (app_threads [i]->is_current ()) - return app_threads [i]; - - // Check whether there's an unused thread slot in the dispatcher. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - if (app_threads [i]->make_current ()) - return app_threads [i]; - - // Thread limit was exceeded. - errno = EMFILE; - return NULL; -} - -zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) -{ - zmq_assert (io_threads.size () > 0); - - // Find the I/O thread with minimum load. - int min_load = io_threads [0]->get_load (); - io_threads_t::size_type result = 0; - for (io_threads_t::size_type i = 1; i != io_threads.size (); i++) { - if (!taskset_ || (taskset_ & (uint64_t (1) << i))) { - int load = io_threads [i]->get_load (); - if (load < min_load) { - min_load = load; - result = i; - } - } - } - - return io_threads [result]; -} - -void zmq::dispatcher_t::create_pipe (object_t *reader_parent_, - object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, - pipe_reader_t **reader_, pipe_writer_t **writer_) -{ - // Create the pipe, reader & writer triple. - pipe_t *pipe = new pipe_t; - zmq_assert (pipe); - pipe_reader_t *reader = new pipe_reader_t (reader_parent_, pipe, - hwm_, lwm_); - zmq_assert (reader); - pipe_writer_t *writer = new pipe_writer_t (writer_parent_, pipe, reader, - hwm_, lwm_); - zmq_assert (writer); - reader->set_peer (writer); - - // Store the pipe in the repository. - pipe_info_t info = {pipe, reader, writer}; - pipes_sync.lock (); - pipe->set_index (pipes.size ()); - pipes.push_back (info); - pipes_sync.unlock (); - - *reader_ = reader; - *writer_ = writer; -} - -void zmq::dispatcher_t::destroy_pipe (pipe_t *pipe_) -{ - // Remove the pipe from the repository. - pipe_info_t info; - pipes_sync.lock (); - pipes_t::size_type i = pipe_->get_index (); - info = pipes [i]; - pipes [i] = pipes.back (); - pipes.pop_back (); - pipes_sync.unlock (); - - // Deallocate the pipe and associated pipe reader & pipe writer. - zmq_assert (info.pipe == pipe_); - delete info.pipe; - delete info.reader; - delete info.writer; -} - -int zmq::dispatcher_t::register_inproc_endpoint (const char *endpoint_, - session_t *session_) -{ - inproc_endpoint_sync.lock (); - inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); - - if (it != inproc_endpoints.end ()) { - inproc_endpoint_sync.unlock (); - errno = EADDRINUSE; - return -1; - } - - inproc_endpoints.insert (std::make_pair (endpoint_, session_)); - - inproc_endpoint_sync.unlock (); - return 0; -} - -zmq::object_t *zmq::dispatcher_t::get_inproc_endpoint (const char *endpoint_) -{ - inproc_endpoint_sync.lock (); - inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); - - if (it == inproc_endpoints.end ()) { - inproc_endpoint_sync.unlock (); - errno = EADDRNOTAVAIL; - return NULL; - } - - it->second->inc_seqnum (); - object_t *session = it->second; - - inproc_endpoint_sync.unlock (); - return session; -} - -void zmq::dispatcher_t::unregister_inproc_endpoints (session_t *session_) -{ - inproc_endpoint_sync.lock (); - - // Remove the connection from the repository. - // TODO: Yes, the algorithm has O(n^2) complexity. Should be O(log n). - for (inproc_endpoints_t::iterator it = inproc_endpoints.begin (); - it != inproc_endpoints.end ();) { - if (it->second == session_) { - inproc_endpoints.erase (it); - it = inproc_endpoints.begin (); - } - else - it++; - } - - inproc_endpoint_sync.unlock (); -} - diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp deleted file mode 100644 index 07c35cd..0000000 --- a/src/dispatcher.hpp +++ /dev/null @@ -1,170 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ -#define __ZMQ_DISPATCHER_HPP_INCLUDED__ - -#include -#include -#include - -#include "i_signaler.hpp" -#include "ypipe.hpp" -#include "command.hpp" -#include "config.hpp" -#include "mutex.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - // Dispatcher implements bidirectional thread-safe passing of commands - // between N threads. It consists of a ypipes to pass commands and - // signalers to wake up the receiver thread when new commands are - // available. Note that dispatcher is inefficient for passing messages - // within a thread (sender thread = receiver thread). The optimisation is - // not part of the class and should be implemented by individual threads - // (presumably by calling the command handling function directly). - - class dispatcher_t - { - public: - - // Create the dispatcher object. Matrix of pipes to communicate between - // each socket and each I/O thread is created along with appropriate - // signalers. - dispatcher_t (int app_threads_, int io_threads_); - - // To be called to terminate the whole infrastructure (zmq_term). - void shutdown (); - - // Create a socket engine. - struct i_api *create_socket (int type_); - - // Returns number of thread slots in the dispatcher. To be used by - // individual threads to find out how many distinct signals can be - // received. - int thread_slot_count (); - - // Write command to the dispatcher. - inline void write (int source_, int destination_, - const command_t &command_) - { - command_pipe_t &pipe = - command_pipes [source_ * signalers.size () + destination_]; - pipe.write (command_); - if (!pipe.flush ()) - signalers [destination_]->signal (source_); - } - - // Read command from the dispatcher. Returns false if there is no - // command available. - inline bool read (int source_, int destination_, command_t *command_) - { - return command_pipes [source_ * signalers.size () + - destination_].read (command_); - } - - // Creates new pipe. - void create_pipe (class object_t *reader_parent_, - class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, - class pipe_reader_t **reader_, class pipe_writer_t **writer_); - - // Deallocates the pipe. - void destroy_pipe (class pipe_t *pipe_); - - // Registers existing session object as an inproc endpoint. - int register_inproc_endpoint (const char *endpoint_, - class session_t *session_); - - // Retrieves an inproc endpoint. Increments the command sequence number - // of the object by one. Caller is thus bound to send the command - // to the connection after invoking this function. Returns NULL if - // the endpoint doesn't exist. - class object_t *get_inproc_endpoint (const char *endpoint_); - - // Removes all the inproc endpoints associated with the given session - // object from the global repository. - void unregister_inproc_endpoints (class session_t *session_); - - // Returns the I/O thread that is the least busy at the moment. - // Taskset specifies which I/O threads are eligible (0 = all). - class io_thread_t *choose_io_thread (uint64_t taskset_); - - private: - - // Clean-up. - ~dispatcher_t (); - - // Returns the app thread associated with the current thread. - // NULL if we are out of app thread slots. - class app_thread_t *choose_app_thread (); - - // Application threads. - typedef std::vector app_threads_t; - app_threads_t app_threads; - - // I/O threads. - typedef std::vector io_threads_t; - io_threads_t io_threads; - - // Signalers for both application and I/O threads. - std::vector signalers; - - // Pipe to hold the commands. - typedef ypipe_t command_pipe_t; - - // NxN matrix of command pipes. - command_pipe_t *command_pipes; - - // Synchronisation of accesses to shared thread data. - mutex_t threads_sync; - - // Global repository of pipes. It's used only on terminal shutdown - // to deallocate all the pipes irrespective of whether they are - // referenced from pipe_reader, pipe_writer or both. - struct pipe_info_t - { - class pipe_t *pipe; - class pipe_reader_t *reader; - class pipe_writer_t *writer; - }; - typedef std::vector pipes_t; - pipes_t pipes; - - // Synchronisation of access to global repository of pipes. - mutex_t pipes_sync; - - // Global repository of available inproc endpoints. - typedef std::map inproc_endpoints_t; - inproc_endpoints_t inproc_endpoints; - - // Synchronisation of access to the global repository - // of inproc endpoints. - mutex_t inproc_endpoint_sync; - - dispatcher_t (const dispatcher_t&); - void operator = (const dispatcher_t&); - }; - -} - -#endif - diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 045627c..162ed4c 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -29,13 +29,13 @@ #include "select.hpp" #include "devpoll.hpp" #include "kqueue.hpp" -#include "dispatcher.hpp" +#include "context.hpp" #include "session.hpp" #include "simple_semaphore.hpp" #include "session.hpp" -zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : - object_t (dispatcher_, thread_slot_) +zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) : + object_t (context_, thread_slot_) { #if defined ZMQ_FORCE_SELECT poller = new select_t; @@ -131,7 +131,7 @@ void zmq::io_thread_t::in_event () // Read all the commands from particular thread. command_t cmd; - while (dispatcher->read (source_thread_slot, thread_slot, &cmd)) + while (context->read (source_thread_slot, thread_slot, &cmd)) cmd.destination->process_command (cmd); } } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index afb8110..585a28b 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -38,7 +38,7 @@ namespace zmq { public: - io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); + io_thread_t (class context_t *context_, int thread_slot_); // Launch the physical thread. void start (); diff --git a/src/object.cpp b/src/object.cpp index a9370ab..7c85212 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -18,7 +18,7 @@ */ #include "object.hpp" -#include "dispatcher.hpp" +#include "context.hpp" #include "err.hpp" #include "pipe_reader.hpp" #include "pipe_writer.hpp" @@ -27,14 +27,14 @@ #include "simple_semaphore.hpp" #include "i_engine.hpp" -zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : - dispatcher (dispatcher_), +zmq::object_t::object_t (context_t *context_, int thread_slot_) : + context (context_), thread_slot (thread_slot_) { } zmq::object_t::object_t (object_t *parent_) : - dispatcher (parent_->dispatcher), + context (parent_->context), thread_slot (parent_->thread_slot) { } @@ -45,7 +45,7 @@ zmq::object_t::~object_t () int zmq::object_t::thread_slot_count () { - return dispatcher->thread_slot_count (); + return context->thread_slot_count (); } int zmq::object_t::get_thread_slot () @@ -107,34 +107,34 @@ void zmq::object_t::create_pipe (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, pipe_reader_t **reader_, pipe_writer_t **writer_) { - dispatcher->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_, + context->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_, reader_, writer_); } void zmq::object_t::destroy_pipe (pipe_t *pipe_) { - dispatcher->destroy_pipe (pipe_); + context->destroy_pipe (pipe_); } int zmq::object_t::register_inproc_endpoint (const char *endpoint_, session_t *session_) { - return dispatcher->register_inproc_endpoint (endpoint_, session_); + return context->register_inproc_endpoint (endpoint_, session_); } zmq::object_t *zmq::object_t::get_inproc_endpoint (const char *endpoint_) { - return dispatcher->get_inproc_endpoint (endpoint_); + return context->get_inproc_endpoint (endpoint_); } void zmq::object_t::unregister_inproc_endpoints (session_t *session_) { - dispatcher->unregister_inproc_endpoints (session_); + context->unregister_inproc_endpoints (session_); } zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { - return dispatcher->choose_io_thread (taskset_); + return context->choose_io_thread (taskset_); } void zmq::object_t::send_stop () @@ -144,7 +144,7 @@ void zmq::object_t::send_stop () command_t cmd; cmd.destination = this; cmd.type = command_t::stop; - dispatcher->write (thread_slot, thread_slot, cmd); + context->write (thread_slot, thread_slot, cmd); } void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, @@ -289,6 +289,6 @@ void zmq::object_t::send_command (command_t &cmd_) if (destination_thread_slot == thread_slot) cmd_.destination->process_command (cmd_); else - dispatcher->write (thread_slot, destination_thread_slot, cmd_); + context->write (thread_slot, destination_thread_slot, cmd_); } diff --git a/src/object.hpp b/src/object.hpp index b2ae334..796e7fa 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,7 +32,7 @@ namespace zmq { public: - object_t (class dispatcher_t *dispatcher_, int thread_slot_); + object_t (class context_t *context_, int thread_slot_); object_t (object_t *parent_); ~object_t (); @@ -42,7 +42,7 @@ namespace zmq protected: // Derived object can use following functions to interact with - // global repositories. See dispatcher.hpp for function details. + // global repositories. See context.hpp for function details. int thread_slot_count (); void create_pipe (class object_t *reader_parent_, class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, @@ -87,7 +87,7 @@ namespace zmq virtual void process_terminate_ack (); // Pointer to the root of the infrastructure. - class dispatcher_t *dispatcher; + class context_t *context; // Slot ID of the thread the object belongs to. int thread_slot; diff --git a/src/pipe.hpp b/src/pipe.hpp index 16ac837..8894a22 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -32,10 +32,10 @@ namespace zmq class pipe_t : public ypipe_t { - // Dispatcher is a friend so that it can create & destroy the pipes. + // Context is a friend so that it can create & destroy the pipes. // By making constructor & destructor private we are sure that nobody - // except dispatcher messes with pipes. - friend class dispatcher_t; + // except context messes with pipes. + friend class context_t; private: @@ -45,7 +45,7 @@ namespace zmq void set_index (int index_); int get_index (); - // Index of the pipe in dispatcher's array of pipes. + // Index of the pipe in context's array of pipes. int index; pipe_t (const pipe_t&); diff --git a/src/pipe_reader.cpp b/src/pipe_reader.cpp index eea1371..79dfe2e 100644 --- a/src/pipe_reader.cpp +++ b/src/pipe_reader.cpp @@ -113,6 +113,6 @@ void zmq::pipe_reader_t::terminate () void zmq::pipe_reader_t::process_terminate_ack () { - // Ask dispatcher to deallocate the pipe. + // Ask context to deallocate the pipe. destroy_pipe (pipe); } diff --git a/src/pipe_reader.hpp b/src/pipe_reader.hpp index 4f85988..cf45bb4 100644 --- a/src/pipe_reader.hpp +++ b/src/pipe_reader.hpp @@ -28,10 +28,10 @@ namespace zmq class pipe_reader_t : public object_t { - // Dispatcher is a friend so that it can create & destroy the reader. + // Context is a friend so that it can create & destroy the reader. // By making constructor & destructor private we are sure that nobody - // except dispatcher messes with readers. - friend class dispatcher_t; + // except context messes with readers. + friend class context_t; public: diff --git a/src/pipe_writer.hpp b/src/pipe_writer.hpp index 2c5132e..a727b1f 100644 --- a/src/pipe_writer.hpp +++ b/src/pipe_writer.hpp @@ -28,10 +28,10 @@ namespace zmq class pipe_writer_t : public object_t { - // Dispatcher is a friend so that it can create & destroy the writer. + // Context is a friend so that it can create & destroy the writer. // By making constructor & destructor private we are sure that nobody - // except dispatcher messes with writers. - friend class dispatcher_t; + // except context messes with writers. + friend class context_t; public: diff --git a/src/safe_object.cpp b/src/safe_object.cpp index 5a5ab8b..d4a92d7 100644 --- a/src/safe_object.cpp +++ b/src/safe_object.cpp @@ -19,9 +19,9 @@ #include "safe_object.hpp" -zmq::safe_object_t::safe_object_t (class dispatcher_t *dispatcher_, - int thread_slot_) : - object_t (dispatcher_, thread_slot_), +zmq::safe_object_t::safe_object_t (class context_t *context_, + int thread_slot_) : + object_t (context_, thread_slot_), processed_seqnum (0), terminating (false) { diff --git a/src/safe_object.hpp b/src/safe_object.hpp index 8bdd41c..b47db48 100644 --- a/src/safe_object.hpp +++ b/src/safe_object.hpp @@ -36,7 +36,7 @@ namespace zmq { public: - safe_object_t (class dispatcher_t *dispatcher_, int thread_slot_); + safe_object_t (class context_t *context_, int thread_slot_); safe_object_t (object_t *parent_); void inc_seqnum (); diff --git a/src/zmq.cpp b/src/zmq.cpp index a7fd486..0fb6fe1 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -25,7 +25,7 @@ #include "i_api.hpp" #include "err.hpp" -#include "dispatcher.hpp" +#include "context.hpp" #include "msg.hpp" int zmq_msg_init (zmq_msg *msg_) @@ -162,28 +162,27 @@ int zmq_msg_type (zmq_msg *msg_) void *zmq_init (int app_threads_, int io_threads_) { - // There should be at least a single thread managed by the dispatcher. + // There should be at least a single thread managed by the context. if (app_threads_ < 0 || io_threads_ < 0 || app_threads_ + io_threads_ == 0) { errno = EINVAL; return NULL; } - zmq::dispatcher_t *dispatcher = - new zmq::dispatcher_t (app_threads_, io_threads_); - zmq_assert (dispatcher); - return (void*) dispatcher; + zmq::context_t *context = new zmq::context_t (app_threads_, io_threads_); + zmq_assert (context); + return (void*) context; } int zmq_term (void *context_) { - ((zmq::dispatcher_t*) context_)->shutdown (); + ((zmq::context_t*) context_)->shutdown (); return 0; } void *zmq_socket (void *context_, int type_) { - return (void*) (((zmq::dispatcher_t*) context_)->create_socket (type_)); + return (void*) (((zmq::context_t*) context_)->create_socket (type_)); } int zmq_close (void *s_) -- cgit v1.2.3