diff options
-rw-r--r-- | src/Makefile.am | 4 | ||||
-rw-r--r-- | src/app_thread.cpp | 8 | ||||
-rw-r--r-- | src/app_thread.hpp | 4 | ||||
-rw-r--r-- | src/context.cpp (renamed from src/dispatcher.cpp) | 28 | ||||
-rw-r--r-- | src/context.hpp (renamed from src/dispatcher.hpp) | 24 | ||||
-rw-r--r-- | src/io_thread.cpp | 8 | ||||
-rw-r--r-- | src/io_thread.hpp | 2 | ||||
-rw-r--r-- | src/object.cpp | 26 | ||||
-rw-r--r-- | src/object.hpp | 6 | ||||
-rw-r--r-- | src/pipe.hpp | 8 | ||||
-rw-r--r-- | src/pipe_reader.cpp | 2 | ||||
-rw-r--r-- | src/pipe_reader.hpp | 6 | ||||
-rw-r--r-- | src/pipe_writer.hpp | 6 | ||||
-rw-r--r-- | src/safe_object.cpp | 6 | ||||
-rw-r--r-- | src/safe_object.hpp | 2 | ||||
-rw-r--r-- | src/zmq.cpp | 15 |
16 files changed, 77 insertions, 78 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index e6d09ca..27f4412 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -8,10 +8,10 @@ libzmq_la_SOURCES = \ command.hpp \ config.hpp \ connecter.hpp \ + context.hpp \ data_distributor.hpp \ decoder.hpp \ devpoll.hpp \ - dispatcher.hpp \ dummy_aggregator.hpp \ dummy_distributor.hpp \ encoder.hpp \ @@ -70,9 +70,9 @@ libzmq_la_SOURCES = \ zmq_tcp_engine.hpp \ app_thread.cpp \ connecter.cpp \ + context.cpp \ data_distributor.cpp \ devpoll.hpp \ - dispatcher.cpp \ dummy_aggregator.cpp \ dummy_distributor.cpp \ epoll.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 2406dbd..9cc61c7 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -26,7 +26,7 @@ #endif #include "app_thread.hpp" -#include "dispatcher.hpp" +#include "context.hpp" #include "err.hpp" #include "session.hpp" #include "pipe.hpp" @@ -51,8 +51,8 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : - object_t (dispatcher_, thread_slot_), +zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : + object_t (context_, thread_slot_), tid (0), last_processing_time (0) { @@ -213,7 +213,7 @@ void zmq::app_thread_t::process_commands (bool block_) for (int i = 0; i != thread_slot_count (); i++) { if (signals & (ypollset_t::signals_t (1) << i)) { command_t cmd; - while (dispatcher->read (i, get_thread_slot (), &cmd)) + while (context->read (i, get_thread_slot (), &cmd)) cmd.destination->process_command (cmd); } } diff --git a/src/app_thread.hpp b/src/app_thread.hpp index ffe5596..8295c2f 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -34,7 +34,7 @@ namespace zmq { public: - app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); + app_thread_t (class context_t *context_, int thread_slot_); // To be called when the whole infrastrucure is being closed. void shutdown (); @@ -47,7 +47,7 @@ namespace zmq struct i_api *create_socket (int type_); // Nota bene: The following two functions are accessed from different - // threads. The caller (dispatcher) is responsible for synchronisation + // threads. The caller (context) is responsible for synchronisation // of accesses. // Returns true is current thread is associated with the app thread. diff --git a/src/dispatcher.cpp b/src/context.cpp index 56a5e0b..ab4643e 100644 --- a/src/dispatcher.cpp +++ b/src/context.cpp @@ -19,7 +19,7 @@ #include "../include/zmq.h" -#include "dispatcher.hpp" +#include "context.hpp" #include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" @@ -34,7 +34,7 @@ #include "windows.h" #endif -zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) +zmq::context_t::context_t (int app_threads_, int io_threads_) { #ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple @@ -72,12 +72,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) io_threads [i]->start (); } -void zmq::dispatcher_t::shutdown () +void zmq::context_t::shutdown () { delete this; } -zmq::dispatcher_t::~dispatcher_t () +zmq::context_t::~context_t () { // Ask I/O threads to terminate. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) @@ -110,12 +110,12 @@ zmq::dispatcher_t::~dispatcher_t () #endif } -int zmq::dispatcher_t::thread_slot_count () +int zmq::context_t::thread_slot_count () { return signalers.size (); } -zmq::i_api *zmq::dispatcher_t::create_socket (int type_) +zmq::i_api *zmq::context_t::create_socket (int type_) { threads_sync.lock (); app_thread_t *thread = choose_app_thread (); @@ -128,14 +128,14 @@ zmq::i_api *zmq::dispatcher_t::create_socket (int type_) return s; } -zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () +zmq::app_thread_t *zmq::context_t::choose_app_thread () { // Check whether thread ID is already assigned. If so, return it. for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) if (app_threads [i]->is_current ()) return app_threads [i]; - // Check whether there's an unused thread slot in the dispatcher. + // Check whether there's an unused thread slot in the cotext. for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) if (app_threads [i]->make_current ()) return app_threads [i]; @@ -145,7 +145,7 @@ zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () return NULL; } -zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) +zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_) { zmq_assert (io_threads.size () > 0); @@ -165,7 +165,7 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) return io_threads [result]; } -void zmq::dispatcher_t::create_pipe (object_t *reader_parent_, +void zmq::context_t::create_pipe (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, pipe_reader_t **reader_, pipe_writer_t **writer_) { @@ -191,7 +191,7 @@ void zmq::dispatcher_t::create_pipe (object_t *reader_parent_, *writer_ = writer; } -void zmq::dispatcher_t::destroy_pipe (pipe_t *pipe_) +void zmq::context_t::destroy_pipe (pipe_t *pipe_) { // Remove the pipe from the repository. pipe_info_t info; @@ -209,7 +209,7 @@ void zmq::dispatcher_t::destroy_pipe (pipe_t *pipe_) delete info.writer; } -int zmq::dispatcher_t::register_inproc_endpoint (const char *endpoint_, +int zmq::context_t::register_inproc_endpoint (const char *endpoint_, session_t *session_) { inproc_endpoint_sync.lock (); @@ -227,7 +227,7 @@ int zmq::dispatcher_t::register_inproc_endpoint (const char *endpoint_, return 0; } -zmq::object_t *zmq::dispatcher_t::get_inproc_endpoint (const char *endpoint_) +zmq::object_t *zmq::context_t::get_inproc_endpoint (const char *endpoint_) { inproc_endpoint_sync.lock (); inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); @@ -245,7 +245,7 @@ zmq::object_t *zmq::dispatcher_t::get_inproc_endpoint (const char *endpoint_) return session; } -void zmq::dispatcher_t::unregister_inproc_endpoints (session_t *session_) +void zmq::context_t::unregister_inproc_endpoints (session_t *session_) { inproc_endpoint_sync.lock (); diff --git a/src/dispatcher.hpp b/src/context.hpp index 07c35cd..7701ef7 100644 --- a/src/dispatcher.hpp +++ b/src/context.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ -#define __ZMQ_DISPATCHER_HPP_INCLUDED__ +#ifndef __ZMQ_CONTEXT_HPP_INCLUDED__ +#define __ZMQ_CONTEXT_HPP_INCLUDED__ #include <vector> #include <map> @@ -37,19 +37,19 @@ namespace zmq // Dispatcher implements bidirectional thread-safe passing of commands // between N threads. It consists of a ypipes to pass commands and // signalers to wake up the receiver thread when new commands are - // available. Note that dispatcher is inefficient for passing messages + // available. Note that context is inefficient for passing messages // within a thread (sender thread = receiver thread). The optimisation is // not part of the class and should be implemented by individual threads // (presumably by calling the command handling function directly). - class dispatcher_t + class context_t { public: - // Create the dispatcher object. Matrix of pipes to communicate between + // Create the context object. Matrix of pipes to communicate between // each socket and each I/O thread is created along with appropriate // signalers. - dispatcher_t (int app_threads_, int io_threads_); + context_t (int app_threads_, int io_threads_); // To be called to terminate the whole infrastructure (zmq_term). void shutdown (); @@ -57,12 +57,12 @@ namespace zmq // Create a socket engine. struct i_api *create_socket (int type_); - // Returns number of thread slots in the dispatcher. To be used by + // Returns number of thread slots in the context. To be used by // individual threads to find out how many distinct signals can be // received. int thread_slot_count (); - // Write command to the dispatcher. + // Send command from the source to the destination. inline void write (int source_, int destination_, const command_t &command_) { @@ -73,7 +73,7 @@ namespace zmq signalers [destination_]->signal (source_); } - // Read command from the dispatcher. Returns false if there is no + // Receive command from the source. Returns false if there is no // command available. inline bool read (int source_, int destination_, command_t *command_) { @@ -110,7 +110,7 @@ namespace zmq private: // Clean-up. - ~dispatcher_t (); + ~context_t (); // Returns the app thread associated with the current thread. // NULL if we are out of app thread slots. @@ -160,8 +160,8 @@ namespace zmq // of inproc endpoints. mutex_t inproc_endpoint_sync; - dispatcher_t (const dispatcher_t&); - void operator = (const dispatcher_t&); + context_t (const context_t&); + void operator = (const context_t&); }; } diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 045627c..162ed4c 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -29,13 +29,13 @@ #include "select.hpp" #include "devpoll.hpp" #include "kqueue.hpp" -#include "dispatcher.hpp" +#include "context.hpp" #include "session.hpp" #include "simple_semaphore.hpp" #include "session.hpp" -zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : - object_t (dispatcher_, thread_slot_) +zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) : + object_t (context_, thread_slot_) { #if defined ZMQ_FORCE_SELECT poller = new select_t; @@ -131,7 +131,7 @@ void zmq::io_thread_t::in_event () // Read all the commands from particular thread. command_t cmd; - while (dispatcher->read (source_thread_slot, thread_slot, &cmd)) + while (context->read (source_thread_slot, thread_slot, &cmd)) cmd.destination->process_command (cmd); } } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index afb8110..585a28b 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -38,7 +38,7 @@ namespace zmq { public: - io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); + io_thread_t (class context_t *context_, int thread_slot_); // Launch the physical thread. void start (); diff --git a/src/object.cpp b/src/object.cpp index a9370ab..7c85212 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -18,7 +18,7 @@ */ #include "object.hpp" -#include "dispatcher.hpp" +#include "context.hpp" #include "err.hpp" #include "pipe_reader.hpp" #include "pipe_writer.hpp" @@ -27,14 +27,14 @@ #include "simple_semaphore.hpp" #include "i_engine.hpp" -zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : - dispatcher (dispatcher_), +zmq::object_t::object_t (context_t *context_, int thread_slot_) : + context (context_), thread_slot (thread_slot_) { } zmq::object_t::object_t (object_t *parent_) : - dispatcher (parent_->dispatcher), + context (parent_->context), thread_slot (parent_->thread_slot) { } @@ -45,7 +45,7 @@ zmq::object_t::~object_t () int zmq::object_t::thread_slot_count () { - return dispatcher->thread_slot_count (); + return context->thread_slot_count (); } int zmq::object_t::get_thread_slot () @@ -107,34 +107,34 @@ void zmq::object_t::create_pipe (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, pipe_reader_t **reader_, pipe_writer_t **writer_) { - dispatcher->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_, + context->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_, reader_, writer_); } void zmq::object_t::destroy_pipe (pipe_t *pipe_) { - dispatcher->destroy_pipe (pipe_); + context->destroy_pipe (pipe_); } int zmq::object_t::register_inproc_endpoint (const char *endpoint_, session_t *session_) { - return dispatcher->register_inproc_endpoint (endpoint_, session_); + return context->register_inproc_endpoint (endpoint_, session_); } zmq::object_t *zmq::object_t::get_inproc_endpoint (const char *endpoint_) { - return dispatcher->get_inproc_endpoint (endpoint_); + return context->get_inproc_endpoint (endpoint_); } void zmq::object_t::unregister_inproc_endpoints (session_t *session_) { - dispatcher->unregister_inproc_endpoints (session_); + context->unregister_inproc_endpoints (session_); } zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { - return dispatcher->choose_io_thread (taskset_); + return context->choose_io_thread (taskset_); } void zmq::object_t::send_stop () @@ -144,7 +144,7 @@ void zmq::object_t::send_stop () command_t cmd; cmd.destination = this; cmd.type = command_t::stop; - dispatcher->write (thread_slot, thread_slot, cmd); + context->write (thread_slot, thread_slot, cmd); } void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, @@ -289,6 +289,6 @@ void zmq::object_t::send_command (command_t &cmd_) if (destination_thread_slot == thread_slot) cmd_.destination->process_command (cmd_); else - dispatcher->write (thread_slot, destination_thread_slot, cmd_); + context->write (thread_slot, destination_thread_slot, cmd_); } diff --git a/src/object.hpp b/src/object.hpp index b2ae334..796e7fa 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,7 +32,7 @@ namespace zmq { public: - object_t (class dispatcher_t *dispatcher_, int thread_slot_); + object_t (class context_t *context_, int thread_slot_); object_t (object_t *parent_); ~object_t (); @@ -42,7 +42,7 @@ namespace zmq protected: // Derived object can use following functions to interact with - // global repositories. See dispatcher.hpp for function details. + // global repositories. See context.hpp for function details. int thread_slot_count (); void create_pipe (class object_t *reader_parent_, class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, @@ -87,7 +87,7 @@ namespace zmq virtual void process_terminate_ack (); // Pointer to the root of the infrastructure. - class dispatcher_t *dispatcher; + class context_t *context; // Slot ID of the thread the object belongs to. int thread_slot; diff --git a/src/pipe.hpp b/src/pipe.hpp index 16ac837..8894a22 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -32,10 +32,10 @@ namespace zmq class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity> { - // Dispatcher is a friend so that it can create & destroy the pipes. + // Context is a friend so that it can create & destroy the pipes. // By making constructor & destructor private we are sure that nobody - // except dispatcher messes with pipes. - friend class dispatcher_t; + // except context messes with pipes. + friend class context_t; private: @@ -45,7 +45,7 @@ namespace zmq void set_index (int index_); int get_index (); - // Index of the pipe in dispatcher's array of pipes. + // Index of the pipe in context's array of pipes. int index; pipe_t (const pipe_t&); diff --git a/src/pipe_reader.cpp b/src/pipe_reader.cpp index eea1371..79dfe2e 100644 --- a/src/pipe_reader.cpp +++ b/src/pipe_reader.cpp @@ -113,6 +113,6 @@ void zmq::pipe_reader_t::terminate () void zmq::pipe_reader_t::process_terminate_ack () { - // Ask dispatcher to deallocate the pipe. + // Ask context to deallocate the pipe. destroy_pipe (pipe); } diff --git a/src/pipe_reader.hpp b/src/pipe_reader.hpp index 4f85988..cf45bb4 100644 --- a/src/pipe_reader.hpp +++ b/src/pipe_reader.hpp @@ -28,10 +28,10 @@ namespace zmq class pipe_reader_t : public object_t { - // Dispatcher is a friend so that it can create & destroy the reader. + // Context is a friend so that it can create & destroy the reader. // By making constructor & destructor private we are sure that nobody - // except dispatcher messes with readers. - friend class dispatcher_t; + // except context messes with readers. + friend class context_t; public: diff --git a/src/pipe_writer.hpp b/src/pipe_writer.hpp index 2c5132e..a727b1f 100644 --- a/src/pipe_writer.hpp +++ b/src/pipe_writer.hpp @@ -28,10 +28,10 @@ namespace zmq class pipe_writer_t : public object_t { - // Dispatcher is a friend so that it can create & destroy the writer. + // Context is a friend so that it can create & destroy the writer. // By making constructor & destructor private we are sure that nobody - // except dispatcher messes with writers. - friend class dispatcher_t; + // except context messes with writers. + friend class context_t; public: diff --git a/src/safe_object.cpp b/src/safe_object.cpp index 5a5ab8b..d4a92d7 100644 --- a/src/safe_object.cpp +++ b/src/safe_object.cpp @@ -19,9 +19,9 @@ #include "safe_object.hpp" -zmq::safe_object_t::safe_object_t (class dispatcher_t *dispatcher_, - int thread_slot_) : - object_t (dispatcher_, thread_slot_), +zmq::safe_object_t::safe_object_t (class context_t *context_, + int thread_slot_) : + object_t (context_, thread_slot_), processed_seqnum (0), terminating (false) { diff --git a/src/safe_object.hpp b/src/safe_object.hpp index 8bdd41c..b47db48 100644 --- a/src/safe_object.hpp +++ b/src/safe_object.hpp @@ -36,7 +36,7 @@ namespace zmq { public: - safe_object_t (class dispatcher_t *dispatcher_, int thread_slot_); + safe_object_t (class context_t *context_, int thread_slot_); safe_object_t (object_t *parent_); void inc_seqnum (); diff --git a/src/zmq.cpp b/src/zmq.cpp index a7fd486..0fb6fe1 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -25,7 +25,7 @@ #include "i_api.hpp" #include "err.hpp" -#include "dispatcher.hpp" +#include "context.hpp" #include "msg.hpp" int zmq_msg_init (zmq_msg *msg_) @@ -162,28 +162,27 @@ int zmq_msg_type (zmq_msg *msg_) void *zmq_init (int app_threads_, int io_threads_) { - // There should be at least a single thread managed by the dispatcher. + // There should be at least a single thread managed by the context. if (app_threads_ < 0 || io_threads_ < 0 || app_threads_ + io_threads_ == 0) { errno = EINVAL; return NULL; } - zmq::dispatcher_t *dispatcher = - new zmq::dispatcher_t (app_threads_, io_threads_); - zmq_assert (dispatcher); - return (void*) dispatcher; + zmq::context_t *context = new zmq::context_t (app_threads_, io_threads_); + zmq_assert (context); + return (void*) context; } int zmq_term (void *context_) { - ((zmq::dispatcher_t*) context_)->shutdown (); + ((zmq::context_t*) context_)->shutdown (); return 0; } void *zmq_socket (void *context_, int type_) { - return (void*) (((zmq::dispatcher_t*) context_)->create_socket (type_)); + return (void*) (((zmq::context_t*) context_)->create_socket (type_)); } int zmq_close (void *s_) |