diff options
-rw-r--r-- | src/Makefile.am | 4 | ||||
-rw-r--r-- | src/app_thread.cpp | 10 | ||||
-rw-r--r-- | src/app_thread.hpp | 2 | ||||
-rw-r--r-- | src/ctx.cpp (renamed from src/dispatcher.cpp) | 30 | ||||
-rw-r--r-- | src/ctx.hpp (renamed from src/dispatcher.hpp) | 23 | ||||
-rw-r--r-- | src/io_thread.cpp | 6 | ||||
-rw-r--r-- | src/io_thread.hpp | 2 | ||||
-rw-r--r-- | src/object.cpp | 28 | ||||
-rw-r--r-- | src/object.hpp | 10 | ||||
-rw-r--r-- | src/socket_base.cpp | 11 | ||||
-rw-r--r-- | src/zmq.cpp | 17 | ||||
-rw-r--r-- | src/zmq_encoder.cpp | 2 |
12 files changed, 74 insertions, 71 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 2cd5ace..70ae248 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -55,9 +55,9 @@ libzmq_la_SOURCES = app_thread.hpp \ blob.hpp \ command.hpp \ config.hpp \ + ctx.hpp \ decoder.hpp \ devpoll.hpp \ - dispatcher.hpp \ downstream.hpp \ encoder.hpp \ epoll.hpp \ @@ -122,8 +122,8 @@ libzmq_la_SOURCES = app_thread.hpp \ zmq_listener.hpp \ app_thread.cpp \ command.cpp \ + ctx.cpp \ devpoll.cpp \ - dispatcher.cpp \ downstream.cpp \ epoll.cpp \ err.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 1c06337..19f997b 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -34,7 +34,7 @@ #endif #include "app_thread.hpp" -#include "dispatcher.hpp" +#include "ctx.hpp" #include "err.hpp" #include "pipe.hpp" #include "config.hpp" @@ -57,9 +57,9 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, +zmq::app_thread_t::app_thread_t (ctx_t *ctx_, uint32_t thread_slot_) : - object_t (dispatcher_, thread_slot_), + object_t (ctx_, thread_slot_), last_processing_time (0), terminated (false) { @@ -163,7 +163,7 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) break; default: if (sockets.empty ()) - get_dispatcher ()->no_sockets (this); + get_ctx ()->no_sockets (this); errno = EINVAL; return NULL; } @@ -178,7 +178,7 @@ void zmq::app_thread_t::remove_socket (socket_base_t *socket_) { sockets.erase (socket_); if (sockets.empty ()) - get_dispatcher ()->no_sockets (this); + get_ctx ()->no_sockets (this); } void zmq::app_thread_t::process_stop () diff --git a/src/app_thread.hpp b/src/app_thread.hpp index bca6947..f0deaab 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -34,7 +34,7 @@ namespace zmq { public: - app_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); + app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_); ~app_thread_t (); diff --git a/src/dispatcher.cpp b/src/ctx.cpp index 2ae99ba..f0e177d 100644 --- a/src/dispatcher.cpp +++ b/src/ctx.cpp @@ -22,7 +22,7 @@ #include "../include/zmq.h" -#include "dispatcher.hpp" +#include "ctx.hpp" #include "socket_base.hpp" #include "app_thread.hpp" #include "io_thread.hpp" @@ -34,7 +34,7 @@ #include "windows.h" #endif -zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) : +zmq::ctx_t::ctx_t (uint32_t io_threads_) : sockets (0), terminated (false) { @@ -65,7 +65,7 @@ zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) : } } -int zmq::dispatcher_t::term () +int zmq::ctx_t::term () { // First send stop command to application threads so that any // blocking calls are interrupted. @@ -86,7 +86,7 @@ int zmq::dispatcher_t::term () return 0; } -zmq::dispatcher_t::~dispatcher_t () +zmq::ctx_t::~ctx_t () { // Ask I/O threads to terminate. If stop signal wasn't sent to I/O // thread subsequent invocation of destructor would hang-up. @@ -117,7 +117,7 @@ zmq::dispatcher_t::~dispatcher_t () #endif } -zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) +zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) { app_threads_sync.lock (); @@ -183,7 +183,7 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) return s; } -void zmq::dispatcher_t::destroy_socket () +void zmq::ctx_t::destroy_socket () { // If zmq_term was already called and there are no more sockets, // terminate the whole 0MQ infrastructure. @@ -197,7 +197,7 @@ void zmq::dispatcher_t::destroy_socket () delete this; } -void zmq::dispatcher_t::no_sockets (app_thread_t *thread_) +void zmq::ctx_t::no_sockets (app_thread_t *thread_) { app_threads_sync.lock (); app_threads_t::size_type i; @@ -210,19 +210,19 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_) app_threads_sync.unlock (); } -void zmq::dispatcher_t::send_command (uint32_t destination_, +void zmq::ctx_t::send_command (uint32_t destination_, const command_t &command_) { signalers [destination_]->send (command_); } -bool zmq::dispatcher_t::recv_command (uint32_t thread_slot_, +bool zmq::ctx_t::recv_command (uint32_t thread_slot_, command_t *command_, bool block_) { return signalers [thread_slot_]->recv (command_, block_); } -zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_) +zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) { // Find the I/O thread with minimum load. zmq_assert (io_threads.size () > 0); @@ -241,7 +241,7 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_) return io_threads [result]; } -void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_) +void zmq::ctx_t::register_pipe (class pipe_t *pipe_) { pipes_sync.lock (); bool inserted = pipes.insert (pipe_).second; @@ -249,7 +249,7 @@ void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_) pipes_sync.unlock (); } -void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) +void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_) { pipes_sync.lock (); pipes_t::size_type erased = pipes.erase (pipe_); @@ -257,7 +257,7 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) pipes_sync.unlock (); } -int zmq::dispatcher_t::register_endpoint (const char *addr_, +int zmq::ctx_t::register_endpoint (const char *addr_, socket_base_t *socket_) { endpoints_sync.lock (); @@ -274,7 +274,7 @@ int zmq::dispatcher_t::register_endpoint (const char *addr_, return 0; } -void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_) +void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) { endpoints_sync.lock (); @@ -292,7 +292,7 @@ void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_) endpoints_sync.unlock (); } -zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_) +zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) { endpoints_sync.lock (); diff --git a/src/dispatcher.hpp b/src/ctx.hpp index cad4844..c96a923 100644 --- a/src/dispatcher.hpp +++ b/src/ctx.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ -#define __ZMQ_DISPATCHER_HPP_INCLUDED__ +#ifndef __ZMQ_CTX_HPP_INCLUDED__ +#define __ZMQ_CTX_HPP_INCLUDED__ #include <vector> #include <set> @@ -34,14 +34,17 @@ namespace zmq { + + // Context object encapsulates all the global state associated with + // the library. - class dispatcher_t + class ctx_t { public: - // Create the dispatcher object. The argument specifies the size + // Create the context object. The argument specifies the size // of I/O thread pool to create. - dispatcher_t (uint32_t io_threads_); + ctx_t (uint32_t io_threads_); // This function is called when user invokes zmq_term. If there are // no more sockets open it'll cause all the infrastructure to be shut @@ -70,7 +73,7 @@ namespace zmq // Taskset specifies which I/O threads are eligible (0 = all). class io_thread_t *choose_io_thread (uint64_t taskset_); - // All pipes are registered with the dispatcher so that even the + // All pipes are registered with the context so that even the // orphaned pipes can be deallocated on the terminal shutdown. void register_pipe (class pipe_t *pipe_); void unregister_pipe (class pipe_t *pipe_); @@ -82,7 +85,7 @@ namespace zmq private: - ~dispatcher_t (); + ~ctx_t (); struct app_thread_info_t { @@ -116,7 +119,7 @@ namespace zmq // As pipes may reside in orphaned state in particular moments // of the pipe shutdown process, i.e. neither pipe reader nor // pipe writer hold reference to the pipe, we have to hold references - // to all pipes in dispatcher so that we can deallocate them + // to all pipes in context so that we can deallocate them // during terminal shutdown even though it conincides with the // pipe being in the orphaned state. typedef std::set <class pipe_t*> pipes_t; @@ -143,8 +146,8 @@ namespace zmq // Synchronisation of access to the list of inproc endpoints. mutex_t endpoints_sync; - dispatcher_t (const dispatcher_t&); - void operator = (const dispatcher_t&); + ctx_t (const ctx_t&); + void operator = (const ctx_t&); }; } diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 92c314a..fac6961 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -24,11 +24,11 @@ #include "io_thread.hpp" #include "platform.hpp" #include "err.hpp" -#include "dispatcher.hpp" +#include "ctx.hpp" -zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, +zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t thread_slot_) : - object_t (dispatcher_, thread_slot_) + object_t (ctx_, thread_slot_) { poller = new (std::nothrow) poller_t; zmq_assert (poller); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 7e105b3..3d832c0 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -38,7 +38,7 @@ namespace zmq { public: - io_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); + io_thread_t (class ctx_t *ctx_, uint32_t thread_slot_); // Clean-up. If the thread was started, it's neccessary to call 'stop' // before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/object.cpp b/src/object.cpp index c5c89cb..324450f 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -20,7 +20,7 @@ #include <string.h> #include "object.hpp" -#include "dispatcher.hpp" +#include "ctx.hpp" #include "err.hpp" #include "pipe.hpp" #include "io_thread.hpp" @@ -28,14 +28,14 @@ #include "session.hpp" #include "socket_base.hpp" -zmq::object_t::object_t (dispatcher_t *dispatcher_, uint32_t thread_slot_) : - dispatcher (dispatcher_), +zmq::object_t::object_t (ctx_t *ctx_, uint32_t thread_slot_) : + ctx (ctx_), thread_slot (thread_slot_) { } zmq::object_t::object_t (object_t *parent_) : - dispatcher (parent_->dispatcher), + ctx (parent_->ctx), thread_slot (parent_->thread_slot) { } @@ -49,9 +49,9 @@ uint32_t zmq::object_t::get_thread_slot () return thread_slot; } -zmq::dispatcher_t *zmq::object_t::get_dispatcher () +zmq::ctx_t *zmq::object_t::get_ctx () { - return dispatcher; + return ctx; } void zmq::object_t::process_command (command_t &cmd_) @@ -125,32 +125,32 @@ void zmq::object_t::process_command (command_t &cmd_) void zmq::object_t::register_pipe (class pipe_t *pipe_) { - dispatcher->register_pipe (pipe_); + ctx->register_pipe (pipe_); } void zmq::object_t::unregister_pipe (class pipe_t *pipe_) { - dispatcher->unregister_pipe (pipe_); + ctx->unregister_pipe (pipe_); } int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_) { - return dispatcher->register_endpoint (addr_, socket_); + return ctx->register_endpoint (addr_, socket_); } void zmq::object_t::unregister_endpoints (socket_base_t *socket_) { - return dispatcher->unregister_endpoints (socket_); + return ctx->unregister_endpoints (socket_); } zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_) { - return dispatcher->find_endpoint (addr_); + return ctx->find_endpoint (addr_); } zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { - return dispatcher->choose_io_thread (taskset_); + return ctx->choose_io_thread (taskset_); } void zmq::object_t::send_stop () @@ -160,7 +160,7 @@ void zmq::object_t::send_stop () command_t cmd; cmd.destination = this; cmd.type = command_t::stop; - dispatcher->send_command (thread_slot, cmd); + ctx->send_command (thread_slot, cmd); } void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) @@ -369,6 +369,6 @@ void zmq::object_t::process_seqnum () void zmq::object_t::send_command (command_t &cmd_) { - dispatcher->send_command (cmd_.destination->get_thread_slot (), cmd_); + ctx->send_command (cmd_.destination->get_thread_slot (), cmd_); } diff --git a/src/object.hpp b/src/object.hpp index 0084e1a..a38b0a6 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,15 +32,15 @@ namespace zmq { public: - object_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); + object_t (class ctx_t *ctx_, uint32_t thread_slot_); object_t (object_t *parent_); virtual ~object_t (); uint32_t get_thread_slot (); - dispatcher_t *get_dispatcher (); + ctx_t *get_ctx (); void process_command (struct command_t &cmd_); - // Allow pipe to access corresponding dispatcher functions. + // Allow pipe to access corresponding context functions. void register_pipe (class pipe_t *pipe_); void unregister_pipe (class pipe_t *pipe_); @@ -101,8 +101,8 @@ namespace zmq private: - // Pointer to the root of the infrastructure. - class dispatcher_t *dispatcher; + // Context provides access to the global state. + class ctx_t *ctx; // Slot ID of the thread the object belongs to. uint32_t thread_slot; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b186683..e946526 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -25,7 +25,7 @@ #include "socket_base.hpp" #include "app_thread.hpp" -#include "dispatcher.hpp" + #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" @@ -34,6 +34,7 @@ #include "owned.hpp" #include "pipe.hpp" #include "err.hpp" +#include "ctx.hpp" #include "platform.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" @@ -456,14 +457,14 @@ int zmq::socket_base_t::close () // Let the thread know that the socket is no longer available. app_thread->remove_socket (this); - // Pointer to the dispatcher must be retrieved before the socket is + // Pointer to the context must be retrieved before the socket is // deallocated. Afterwards it is not available. - dispatcher_t *dispatcher = get_dispatcher (); + ctx_t *ctx = get_ctx (); // Unregister all inproc endpoints associated with this socket. // From this point we are sure that inc_seqnum won't be called again // on this object. - dispatcher->unregister_endpoints (this); + ctx->unregister_endpoints (this); // Wait till all undelivered commands are delivered. This should happen // very quickly. There's no way to wait here for extensive period of time. @@ -503,7 +504,7 @@ int zmq::socket_base_t::close () // This function must be called after the socket is completely deallocated // as it may cause termination of the whole 0MQ infrastructure. - dispatcher->destroy_socket (); + ctx->destroy_socket (); return 0; } diff --git a/src/zmq.cpp b/src/zmq.cpp index e97cb64..ecb3d3d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -29,11 +29,11 @@ #include "streamer.hpp" #include "socket_base.hpp" #include "app_thread.hpp" -#include "dispatcher.hpp" #include "msg_content.hpp" #include "platform.hpp" #include "stdint.hpp" #include "config.hpp" +#include "ctx.hpp" #include "err.hpp" #include "fd.hpp" @@ -263,15 +263,14 @@ void *zmq_init (int /*app_threads_*/, int io_threads_, int /*flags_*/) #endif // Create 0MQ context. - zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t ( - (uint32_t) io_threads_); - zmq_assert (dispatcher); - return (void*) dispatcher; + zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_); + zmq_assert (ctx); + return (void*) ctx; } -int zmq_term (void *dispatcher_) +int zmq_term (void *ctx_) { - int rc = ((zmq::dispatcher_t*) dispatcher_)->term (); + int rc = ((zmq::ctx_t*) ctx_)->term (); int en = errno; #if defined ZMQ_HAVE_OPENPGM @@ -284,9 +283,9 @@ int zmq_term (void *dispatcher_) return rc; } -void *zmq_socket (void *dispatcher_, int type_) +void *zmq_socket (void *ctx_, int type_) { - return (void*) (((zmq::dispatcher_t*) dispatcher_)->create_socket (type_)); + return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_)); } int zmq_close (void *s_) diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index af71229..077286f 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -54,7 +54,7 @@ bool zmq::zmq_encoder_t::message_ready () // Destroy content of the old message. zmq_msg_close(&in_progress); - // Read new message from the dispatcher. If there is none, return false. + // Read new message. If there is none, return false. // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. |