From 835e893e54598ff474067cc68b787440baf6b05c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 5 May 2010 14:24:54 +0200 Subject: dispatcher_t class renamed to ctx_t --- src/Makefile.am | 4 +- src/app_thread.cpp | 10 +- src/app_thread.hpp | 2 +- src/ctx.cpp | 316 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/ctx.hpp | 156 ++++++++++++++++++++++++++ src/dispatcher.cpp | 316 ---------------------------------------------------- src/dispatcher.hpp | 153 ------------------------- src/io_thread.cpp | 6 +- src/io_thread.hpp | 2 +- src/object.cpp | 28 ++--- src/object.hpp | 10 +- src/socket_base.cpp | 11 +- src/zmq.cpp | 17 ++- src/zmq_encoder.cpp | 2 +- 14 files changed, 518 insertions(+), 515 deletions(-) create mode 100644 src/ctx.cpp create mode 100644 src/ctx.hpp delete mode 100644 src/dispatcher.cpp delete mode 100644 src/dispatcher.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 2cd5ace..70ae248 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -55,9 +55,9 @@ libzmq_la_SOURCES = app_thread.hpp \ blob.hpp \ command.hpp \ config.hpp \ + ctx.hpp \ decoder.hpp \ devpoll.hpp \ - dispatcher.hpp \ downstream.hpp \ encoder.hpp \ epoll.hpp \ @@ -122,8 +122,8 @@ libzmq_la_SOURCES = app_thread.hpp \ zmq_listener.hpp \ app_thread.cpp \ command.cpp \ + ctx.cpp \ devpoll.cpp \ - dispatcher.cpp \ downstream.cpp \ epoll.cpp \ err.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 1c06337..19f997b 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -34,7 +34,7 @@ #endif #include "app_thread.hpp" -#include "dispatcher.hpp" +#include "ctx.hpp" #include "err.hpp" #include "pipe.hpp" #include "config.hpp" @@ -57,9 +57,9 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, +zmq::app_thread_t::app_thread_t (ctx_t *ctx_, uint32_t thread_slot_) : - object_t (dispatcher_, thread_slot_), + object_t (ctx_, thread_slot_), last_processing_time (0), terminated (false) { @@ -163,7 +163,7 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) break; default: if (sockets.empty ()) - get_dispatcher ()->no_sockets (this); + get_ctx ()->no_sockets (this); errno = EINVAL; return NULL; } @@ -178,7 +178,7 @@ void zmq::app_thread_t::remove_socket (socket_base_t *socket_) { sockets.erase (socket_); if (sockets.empty ()) - get_dispatcher ()->no_sockets (this); + get_ctx ()->no_sockets (this); } void zmq::app_thread_t::process_stop () diff --git a/src/app_thread.hpp b/src/app_thread.hpp index bca6947..f0deaab 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -34,7 +34,7 @@ namespace zmq { public: - app_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); + app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_); ~app_thread_t (); diff --git a/src/ctx.cpp b/src/ctx.cpp new file mode 100644 index 0000000..f0e177d --- /dev/null +++ b/src/ctx.cpp @@ -0,0 +1,316 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include +#include + +#include "../include/zmq.h" + +#include "ctx.hpp" +#include "socket_base.hpp" +#include "app_thread.hpp" +#include "io_thread.hpp" +#include "platform.hpp" +#include "err.hpp" +#include "pipe.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.h" +#endif + +zmq::ctx_t::ctx_t (uint32_t io_threads_) : + sockets (0), + terminated (false) +{ +#ifdef ZMQ_HAVE_WINDOWS + // Intialise Windows sockets. Note that WSAStartup can be called multiple + // times given that WSACleanup will be called for each WSAStartup. + WORD version_requested = MAKEWORD (2, 2); + WSADATA wsa_data; + int rc = WSAStartup (version_requested, &wsa_data); + zmq_assert (rc == 0); + zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && + HIBYTE (wsa_data.wVersion) == 2); +#endif + + // Initialise the array of signalers. + signalers_count = max_app_threads + io_threads_; + signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count); + zmq_assert (signalers); + memset (signalers, 0, sizeof (signaler_t*) * signalers_count); + + // Create I/O thread objects and launch them. + for (uint32_t i = 0; i != io_threads_; i++) { + io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); + zmq_assert (io_thread); + io_threads.push_back (io_thread); + signalers [i] = io_thread->get_signaler (); + io_thread->start (); + } +} + +int zmq::ctx_t::term () +{ + // First send stop command to application threads so that any + // blocking calls are interrupted. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + app_threads [i].app_thread->stop (); + + // Then mark context as terminated. + term_sync.lock (); + zmq_assert (!terminated); + terminated = true; + bool destroy = (sockets == 0); + term_sync.unlock (); + + // If there are no sockets open, destroy the context immediately. + if (destroy) + delete this; + + return 0; +} + +zmq::ctx_t::~ctx_t () +{ + // Ask I/O threads to terminate. If stop signal wasn't sent to I/O + // thread subsequent invocation of destructor would hang-up. + for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) + io_threads [i]->stop (); + + // Wait till I/O threads actually terminate. + for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) + delete io_threads [i]; + + // Close all application theads, sockets, io_objects etc. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + delete app_threads [i].app_thread; + + // Deallocate all the orphaned pipes. + while (!pipes.empty ()) + delete *pipes.begin (); + + // Deallocate the array of pointers to signalers. No special work is + // needed as signalers themselves were deallocated with their + // corresponding (app_/io_) thread objects. + free (signalers); + +#ifdef ZMQ_HAVE_WINDOWS + // On Windows, uninitialise socket layer. + int rc = WSACleanup (); + wsa_assert (rc != SOCKET_ERROR); +#endif +} + +zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +{ + app_threads_sync.lock (); + + // Find whether the calling thread has app_thread_t object associated + // already. At the same time find an unused app_thread_t so that it can + // be used if there's no associated object for the calling thread. + // Check whether thread ID is already assigned. If so, return it. + app_threads_t::size_type unused = app_threads.size (); + app_threads_t::size_type current; + for (current = 0; current != app_threads.size (); current++) { + if (app_threads [current].associated && + thread_t::equal (thread_t::id (), app_threads [current].tid)) + break; + if (!app_threads [current].associated) + unused = current; + } + + // If no app_thread_t is associated with the calling thread, + // associate it with one of the unused app_thread_t objects. + if (current == app_threads.size ()) { + + // If all the existing app_threads are already used, create one more. + if (unused == app_threads.size ()) { + + // If max_app_threads limit was reached, return error. + if (app_threads.size () == max_app_threads) { + app_threads_sync.unlock (); + errno = EMTHREAD; + return NULL; + } + + // Create the new application thread proxy object. + app_thread_info_t info; + info.associated = false; + info.app_thread = new (std::nothrow) app_thread_t (this, + io_threads.size () + app_threads.size ()); + zmq_assert (info.app_thread); + signalers [io_threads.size () + app_threads.size ()] = + info.app_thread->get_signaler (); + app_threads.push_back (info); + } + + // Incidentally, this works both when there is an unused app_thread + // and when a new one is created. + current = unused; + + // Associate the selected app_thread with the OS thread. + app_threads [current].associated = true; + app_threads [current].tid = thread_t::id (); + } + + app_thread_t *thread = app_threads [current].app_thread; + app_threads_sync.unlock (); + + socket_base_t *s = thread->create_socket (type_); + if (!s) + return NULL; + + term_sync.lock (); + sockets++; + term_sync.unlock (); + + return s; +} + +void zmq::ctx_t::destroy_socket () +{ + // If zmq_term was already called and there are no more sockets, + // terminate the whole 0MQ infrastructure. + term_sync.lock (); + zmq_assert (sockets > 0); + sockets--; + bool destroy = (sockets == 0 && terminated); + term_sync.unlock (); + + if (destroy) + delete this; +} + +void zmq::ctx_t::no_sockets (app_thread_t *thread_) +{ + app_threads_sync.lock (); + app_threads_t::size_type i; + for (i = 0; i != app_threads.size (); i++) + if (app_threads [i].app_thread == thread_) { + app_threads [i].associated = false; + break; + } + zmq_assert (i != app_threads.size ()); + app_threads_sync.unlock (); +} + +void zmq::ctx_t::send_command (uint32_t destination_, + const command_t &command_) +{ + signalers [destination_]->send (command_); +} + +bool zmq::ctx_t::recv_command (uint32_t thread_slot_, + command_t *command_, bool block_) +{ + return signalers [thread_slot_]->recv (command_, block_); +} + +zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) +{ + // Find the I/O thread with minimum load. + zmq_assert (io_threads.size () > 0); + int min_load = -1; + io_threads_t::size_type result = 0; + for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) { + if (!affinity_ || (affinity_ & (uint64_t (1) << i))) { + int load = io_threads [i]->get_load (); + if (min_load == -1 || load < min_load) { + min_load = load; + result = i; + } + } + } + zmq_assert (min_load != -1); + return io_threads [result]; +} + +void zmq::ctx_t::register_pipe (class pipe_t *pipe_) +{ + pipes_sync.lock (); + bool inserted = pipes.insert (pipe_).second; + zmq_assert (inserted); + pipes_sync.unlock (); +} + +void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_) +{ + pipes_sync.lock (); + pipes_t::size_type erased = pipes.erase (pipe_); + zmq_assert (erased == 1); + pipes_sync.unlock (); +} + +int zmq::ctx_t::register_endpoint (const char *addr_, + socket_base_t *socket_) +{ + endpoints_sync.lock (); + + bool inserted = endpoints.insert (std::make_pair (std::string (addr_), + socket_)).second; + if (!inserted) { + errno = EADDRINUSE; + endpoints_sync.unlock (); + return -1; + } + + endpoints_sync.unlock (); + return 0; +} + +void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.begin (); + while (it != endpoints.end ()) { + if (it->second == socket_) { + endpoints_t::iterator to_erase = it; + it++; + endpoints.erase (to_erase); + continue; + } + it++; + } + + endpoints_sync.unlock (); +} + +zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.find (addr_); + if (it == endpoints.end ()) { + endpoints_sync.unlock (); + errno = ECONNREFUSED; + return NULL; + } + socket_base_t *endpoint = it->second; + + // Increment the command sequence number of the peer so that it won't + // get deallocated until "bind" command is issued by the caller. + // The subsequent 'bind' has to be called with inc_seqnum parameter + // set to false, so that the seqnum isn't incremented twice. + endpoint->inc_seqnum (); + + endpoints_sync.unlock (); + return endpoint; +} + diff --git a/src/ctx.hpp b/src/ctx.hpp new file mode 100644 index 0000000..c96a923 --- /dev/null +++ b/src/ctx.hpp @@ -0,0 +1,156 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_CTX_HPP_INCLUDED__ +#define __ZMQ_CTX_HPP_INCLUDED__ + +#include +#include +#include +#include + +#include "signaler.hpp" +#include "ypipe.hpp" +#include "config.hpp" +#include "mutex.hpp" +#include "stdint.hpp" +#include "thread.hpp" + +namespace zmq +{ + + // Context object encapsulates all the global state associated with + // the library. + + class ctx_t + { + public: + + // Create the context object. The argument specifies the size + // of I/O thread pool to create. + ctx_t (uint32_t io_threads_); + + // This function is called when user invokes zmq_term. If there are + // no more sockets open it'll cause all the infrastructure to be shut + // down. If there are open sockets still, the deallocation happens + // after the last one is closed. + int term (); + + // Create a socket. + class socket_base_t *create_socket (int type_); + + // Destroy a socket. + void destroy_socket (); + + // Called by app_thread_t when it has no more sockets. The function + // should disassociate the object from the current OS thread. + void no_sockets (class app_thread_t *thread_); + + // Send command to the destination thread. + void send_command (uint32_t destination_, const command_t &command_); + + // Receive command from another thread. + bool recv_command (uint32_t thread_slot_, command_t *command_, + bool block_); + + // Returns the I/O thread that is the least busy at the moment. + // Taskset specifies which I/O threads are eligible (0 = all). + class io_thread_t *choose_io_thread (uint64_t taskset_); + + // All pipes are registered with the context so that even the + // orphaned pipes can be deallocated on the terminal shutdown. + void register_pipe (class pipe_t *pipe_); + void unregister_pipe (class pipe_t *pipe_); + + // Management of inproc endpoints. + int register_endpoint (const char *addr_, class socket_base_t *socket_); + void unregister_endpoints (class socket_base_t *socket_); + class socket_base_t *find_endpoint (const char *addr_); + + private: + + ~ctx_t (); + + struct app_thread_info_t + { + // If false, 0MQ application thread is free, there's no associated + // OS thread. + bool associated; + + // ID of the associated OS thread. If 'associated' is false, + // this field contains bogus data. + thread_t::id_t tid; + + // Pointer to the 0MQ application thread object. + class app_thread_t *app_thread; + }; + + // Application threads. + typedef std::vector app_threads_t; + app_threads_t app_threads; + + // Synchronisation of accesses to shared application thread data. + mutex_t app_threads_sync; + + // I/O threads. + typedef std::vector io_threads_t; + io_threads_t io_threads; + + // Array of pointers to signalers for both application and I/O threads. + int signalers_count; + signaler_t **signalers; + + // As pipes may reside in orphaned state in particular moments + // of the pipe shutdown process, i.e. neither pipe reader nor + // pipe writer hold reference to the pipe, we have to hold references + // to all pipes in context so that we can deallocate them + // during terminal shutdown even though it conincides with the + // pipe being in the orphaned state. + typedef std::set pipes_t; + pipes_t pipes; + + // Synchronisation of access to the pipes repository. + mutex_t pipes_sync; + + // Number of sockets alive. + int sockets; + + // If true, zmq_term was already called. When last socket is closed + // the whole 0MQ infrastructure should be deallocated. + bool terminated; + + // Synchronisation of access to the termination data (socket count + // and 'terminated' flag). + mutex_t term_sync; + + // List of inproc endpoints within this context. + typedef std::map endpoints_t; + endpoints_t endpoints; + + // Synchronisation of access to the list of inproc endpoints. + mutex_t endpoints_sync; + + ctx_t (const ctx_t&); + void operator = (const ctx_t&); + }; + +} + +#endif + diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp deleted file mode 100644 index 2ae99ba..0000000 --- a/src/dispatcher.cpp +++ /dev/null @@ -1,316 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include -#include - -#include "../include/zmq.h" - -#include "dispatcher.hpp" -#include "socket_base.hpp" -#include "app_thread.hpp" -#include "io_thread.hpp" -#include "platform.hpp" -#include "err.hpp" -#include "pipe.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include "windows.h" -#endif - -zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) : - sockets (0), - terminated (false) -{ -#ifdef ZMQ_HAVE_WINDOWS - // Intialise Windows sockets. Note that WSAStartup can be called multiple - // times given that WSACleanup will be called for each WSAStartup. - WORD version_requested = MAKEWORD (2, 2); - WSADATA wsa_data; - int rc = WSAStartup (version_requested, &wsa_data); - zmq_assert (rc == 0); - zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && - HIBYTE (wsa_data.wVersion) == 2); -#endif - - // Initialise the array of signalers. - signalers_count = max_app_threads + io_threads_; - signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count); - zmq_assert (signalers); - memset (signalers, 0, sizeof (signaler_t*) * signalers_count); - - // Create I/O thread objects and launch them. - for (uint32_t i = 0; i != io_threads_; i++) { - io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); - zmq_assert (io_thread); - io_threads.push_back (io_thread); - signalers [i] = io_thread->get_signaler (); - io_thread->start (); - } -} - -int zmq::dispatcher_t::term () -{ - // First send stop command to application threads so that any - // blocking calls are interrupted. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - app_threads [i].app_thread->stop (); - - // Then mark context as terminated. - term_sync.lock (); - zmq_assert (!terminated); - terminated = true; - bool destroy = (sockets == 0); - term_sync.unlock (); - - // If there are no sockets open, destroy the context immediately. - if (destroy) - delete this; - - return 0; -} - -zmq::dispatcher_t::~dispatcher_t () -{ - // Ask I/O threads to terminate. If stop signal wasn't sent to I/O - // thread subsequent invocation of destructor would hang-up. - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - io_threads [i]->stop (); - - // Wait till I/O threads actually terminate. - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - delete io_threads [i]; - - // Close all application theads, sockets, io_objects etc. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - delete app_threads [i].app_thread; - - // Deallocate all the orphaned pipes. - while (!pipes.empty ()) - delete *pipes.begin (); - - // Deallocate the array of pointers to signalers. No special work is - // needed as signalers themselves were deallocated with their - // corresponding (app_/io_) thread objects. - free (signalers); - -#ifdef ZMQ_HAVE_WINDOWS - // On Windows, uninitialise socket layer. - int rc = WSACleanup (); - wsa_assert (rc != SOCKET_ERROR); -#endif -} - -zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) -{ - app_threads_sync.lock (); - - // Find whether the calling thread has app_thread_t object associated - // already. At the same time find an unused app_thread_t so that it can - // be used if there's no associated object for the calling thread. - // Check whether thread ID is already assigned. If so, return it. - app_threads_t::size_type unused = app_threads.size (); - app_threads_t::size_type current; - for (current = 0; current != app_threads.size (); current++) { - if (app_threads [current].associated && - thread_t::equal (thread_t::id (), app_threads [current].tid)) - break; - if (!app_threads [current].associated) - unused = current; - } - - // If no app_thread_t is associated with the calling thread, - // associate it with one of the unused app_thread_t objects. - if (current == app_threads.size ()) { - - // If all the existing app_threads are already used, create one more. - if (unused == app_threads.size ()) { - - // If max_app_threads limit was reached, return error. - if (app_threads.size () == max_app_threads) { - app_threads_sync.unlock (); - errno = EMTHREAD; - return NULL; - } - - // Create the new application thread proxy object. - app_thread_info_t info; - info.associated = false; - info.app_thread = new (std::nothrow) app_thread_t (this, - io_threads.size () + app_threads.size ()); - zmq_assert (info.app_thread); - signalers [io_threads.size () + app_threads.size ()] = - info.app_thread->get_signaler (); - app_threads.push_back (info); - } - - // Incidentally, this works both when there is an unused app_thread - // and when a new one is created. - current = unused; - - // Associate the selected app_thread with the OS thread. - app_threads [current].associated = true; - app_threads [current].tid = thread_t::id (); - } - - app_thread_t *thread = app_threads [current].app_thread; - app_threads_sync.unlock (); - - socket_base_t *s = thread->create_socket (type_); - if (!s) - return NULL; - - term_sync.lock (); - sockets++; - term_sync.unlock (); - - return s; -} - -void zmq::dispatcher_t::destroy_socket () -{ - // If zmq_term was already called and there are no more sockets, - // terminate the whole 0MQ infrastructure. - term_sync.lock (); - zmq_assert (sockets > 0); - sockets--; - bool destroy = (sockets == 0 && terminated); - term_sync.unlock (); - - if (destroy) - delete this; -} - -void zmq::dispatcher_t::no_sockets (app_thread_t *thread_) -{ - app_threads_sync.lock (); - app_threads_t::size_type i; - for (i = 0; i != app_threads.size (); i++) - if (app_threads [i].app_thread == thread_) { - app_threads [i].associated = false; - break; - } - zmq_assert (i != app_threads.size ()); - app_threads_sync.unlock (); -} - -void zmq::dispatcher_t::send_command (uint32_t destination_, - const command_t &command_) -{ - signalers [destination_]->send (command_); -} - -bool zmq::dispatcher_t::recv_command (uint32_t thread_slot_, - command_t *command_, bool block_) -{ - return signalers [thread_slot_]->recv (command_, block_); -} - -zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_) -{ - // Find the I/O thread with minimum load. - zmq_assert (io_threads.size () > 0); - int min_load = -1; - io_threads_t::size_type result = 0; - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) { - if (!affinity_ || (affinity_ & (uint64_t (1) << i))) { - int load = io_threads [i]->get_load (); - if (min_load == -1 || load < min_load) { - min_load = load; - result = i; - } - } - } - zmq_assert (min_load != -1); - return io_threads [result]; -} - -void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_) -{ - pipes_sync.lock (); - bool inserted = pipes.insert (pipe_).second; - zmq_assert (inserted); - pipes_sync.unlock (); -} - -void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) -{ - pipes_sync.lock (); - pipes_t::size_type erased = pipes.erase (pipe_); - zmq_assert (erased == 1); - pipes_sync.unlock (); -} - -int zmq::dispatcher_t::register_endpoint (const char *addr_, - socket_base_t *socket_) -{ - endpoints_sync.lock (); - - bool inserted = endpoints.insert (std::make_pair (std::string (addr_), - socket_)).second; - if (!inserted) { - errno = EADDRINUSE; - endpoints_sync.unlock (); - return -1; - } - - endpoints_sync.unlock (); - return 0; -} - -void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_) -{ - endpoints_sync.lock (); - - endpoints_t::iterator it = endpoints.begin (); - while (it != endpoints.end ()) { - if (it->second == socket_) { - endpoints_t::iterator to_erase = it; - it++; - endpoints.erase (to_erase); - continue; - } - it++; - } - - endpoints_sync.unlock (); -} - -zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_) -{ - endpoints_sync.lock (); - - endpoints_t::iterator it = endpoints.find (addr_); - if (it == endpoints.end ()) { - endpoints_sync.unlock (); - errno = ECONNREFUSED; - return NULL; - } - socket_base_t *endpoint = it->second; - - // Increment the command sequence number of the peer so that it won't - // get deallocated until "bind" command is issued by the caller. - // The subsequent 'bind' has to be called with inc_seqnum parameter - // set to false, so that the seqnum isn't incremented twice. - endpoint->inc_seqnum (); - - endpoints_sync.unlock (); - return endpoint; -} - diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp deleted file mode 100644 index cad4844..0000000 --- a/src/dispatcher.hpp +++ /dev/null @@ -1,153 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ -#define __ZMQ_DISPATCHER_HPP_INCLUDED__ - -#include -#include -#include -#include - -#include "signaler.hpp" -#include "ypipe.hpp" -#include "config.hpp" -#include "mutex.hpp" -#include "stdint.hpp" -#include "thread.hpp" - -namespace zmq -{ - - class dispatcher_t - { - public: - - // Create the dispatcher object. The argument specifies the size - // of I/O thread pool to create. - dispatcher_t (uint32_t io_threads_); - - // This function is called when user invokes zmq_term. If there are - // no more sockets open it'll cause all the infrastructure to be shut - // down. If there are open sockets still, the deallocation happens - // after the last one is closed. - int term (); - - // Create a socket. - class socket_base_t *create_socket (int type_); - - // Destroy a socket. - void destroy_socket (); - - // Called by app_thread_t when it has no more sockets. The function - // should disassociate the object from the current OS thread. - void no_sockets (class app_thread_t *thread_); - - // Send command to the destination thread. - void send_command (uint32_t destination_, const command_t &command_); - - // Receive command from another thread. - bool recv_command (uint32_t thread_slot_, command_t *command_, - bool block_); - - // Returns the I/O thread that is the least busy at the moment. - // Taskset specifies which I/O threads are eligible (0 = all). - class io_thread_t *choose_io_thread (uint64_t taskset_); - - // All pipes are registered with the dispatcher so that even the - // orphaned pipes can be deallocated on the terminal shutdown. - void register_pipe (class pipe_t *pipe_); - void unregister_pipe (class pipe_t *pipe_); - - // Management of inproc endpoints. - int register_endpoint (const char *addr_, class socket_base_t *socket_); - void unregister_endpoints (class socket_base_t *socket_); - class socket_base_t *find_endpoint (const char *addr_); - - private: - - ~dispatcher_t (); - - struct app_thread_info_t - { - // If false, 0MQ application thread is free, there's no associated - // OS thread. - bool associated; - - // ID of the associated OS thread. If 'associated' is false, - // this field contains bogus data. - thread_t::id_t tid; - - // Pointer to the 0MQ application thread object. - class app_thread_t *app_thread; - }; - - // Application threads. - typedef std::vector app_threads_t; - app_threads_t app_threads; - - // Synchronisation of accesses to shared application thread data. - mutex_t app_threads_sync; - - // I/O threads. - typedef std::vector io_threads_t; - io_threads_t io_threads; - - // Array of pointers to signalers for both application and I/O threads. - int signalers_count; - signaler_t **signalers; - - // As pipes may reside in orphaned state in particular moments - // of the pipe shutdown process, i.e. neither pipe reader nor - // pipe writer hold reference to the pipe, we have to hold references - // to all pipes in dispatcher so that we can deallocate them - // during terminal shutdown even though it conincides with the - // pipe being in the orphaned state. - typedef std::set pipes_t; - pipes_t pipes; - - // Synchronisation of access to the pipes repository. - mutex_t pipes_sync; - - // Number of sockets alive. - int sockets; - - // If true, zmq_term was already called. When last socket is closed - // the whole 0MQ infrastructure should be deallocated. - bool terminated; - - // Synchronisation of access to the termination data (socket count - // and 'terminated' flag). - mutex_t term_sync; - - // List of inproc endpoints within this context. - typedef std::map endpoints_t; - endpoints_t endpoints; - - // Synchronisation of access to the list of inproc endpoints. - mutex_t endpoints_sync; - - dispatcher_t (const dispatcher_t&); - void operator = (const dispatcher_t&); - }; - -} - -#endif - diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 92c314a..fac6961 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -24,11 +24,11 @@ #include "io_thread.hpp" #include "platform.hpp" #include "err.hpp" -#include "dispatcher.hpp" +#include "ctx.hpp" -zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, +zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t thread_slot_) : - object_t (dispatcher_, thread_slot_) + object_t (ctx_, thread_slot_) { poller = new (std::nothrow) poller_t; zmq_assert (poller); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 7e105b3..3d832c0 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -38,7 +38,7 @@ namespace zmq { public: - io_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); + io_thread_t (class ctx_t *ctx_, uint32_t thread_slot_); // Clean-up. If the thread was started, it's neccessary to call 'stop' // before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/object.cpp b/src/object.cpp index c5c89cb..324450f 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -20,7 +20,7 @@ #include #include "object.hpp" -#include "dispatcher.hpp" +#include "ctx.hpp" #include "err.hpp" #include "pipe.hpp" #include "io_thread.hpp" @@ -28,14 +28,14 @@ #include "session.hpp" #include "socket_base.hpp" -zmq::object_t::object_t (dispatcher_t *dispatcher_, uint32_t thread_slot_) : - dispatcher (dispatcher_), +zmq::object_t::object_t (ctx_t *ctx_, uint32_t thread_slot_) : + ctx (ctx_), thread_slot (thread_slot_) { } zmq::object_t::object_t (object_t *parent_) : - dispatcher (parent_->dispatcher), + ctx (parent_->ctx), thread_slot (parent_->thread_slot) { } @@ -49,9 +49,9 @@ uint32_t zmq::object_t::get_thread_slot () return thread_slot; } -zmq::dispatcher_t *zmq::object_t::get_dispatcher () +zmq::ctx_t *zmq::object_t::get_ctx () { - return dispatcher; + return ctx; } void zmq::object_t::process_command (command_t &cmd_) @@ -125,32 +125,32 @@ void zmq::object_t::process_command (command_t &cmd_) void zmq::object_t::register_pipe (class pipe_t *pipe_) { - dispatcher->register_pipe (pipe_); + ctx->register_pipe (pipe_); } void zmq::object_t::unregister_pipe (class pipe_t *pipe_) { - dispatcher->unregister_pipe (pipe_); + ctx->unregister_pipe (pipe_); } int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_) { - return dispatcher->register_endpoint (addr_, socket_); + return ctx->register_endpoint (addr_, socket_); } void zmq::object_t::unregister_endpoints (socket_base_t *socket_) { - return dispatcher->unregister_endpoints (socket_); + return ctx->unregister_endpoints (socket_); } zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_) { - return dispatcher->find_endpoint (addr_); + return ctx->find_endpoint (addr_); } zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { - return dispatcher->choose_io_thread (taskset_); + return ctx->choose_io_thread (taskset_); } void zmq::object_t::send_stop () @@ -160,7 +160,7 @@ void zmq::object_t::send_stop () command_t cmd; cmd.destination = this; cmd.type = command_t::stop; - dispatcher->send_command (thread_slot, cmd); + ctx->send_command (thread_slot, cmd); } void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) @@ -369,6 +369,6 @@ void zmq::object_t::process_seqnum () void zmq::object_t::send_command (command_t &cmd_) { - dispatcher->send_command (cmd_.destination->get_thread_slot (), cmd_); + ctx->send_command (cmd_.destination->get_thread_slot (), cmd_); } diff --git a/src/object.hpp b/src/object.hpp index 0084e1a..a38b0a6 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,15 +32,15 @@ namespace zmq { public: - object_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); + object_t (class ctx_t *ctx_, uint32_t thread_slot_); object_t (object_t *parent_); virtual ~object_t (); uint32_t get_thread_slot (); - dispatcher_t *get_dispatcher (); + ctx_t *get_ctx (); void process_command (struct command_t &cmd_); - // Allow pipe to access corresponding dispatcher functions. + // Allow pipe to access corresponding context functions. void register_pipe (class pipe_t *pipe_); void unregister_pipe (class pipe_t *pipe_); @@ -101,8 +101,8 @@ namespace zmq private: - // Pointer to the root of the infrastructure. - class dispatcher_t *dispatcher; + // Context provides access to the global state. + class ctx_t *ctx; // Slot ID of the thread the object belongs to. uint32_t thread_slot; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b186683..e946526 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -25,7 +25,7 @@ #include "socket_base.hpp" #include "app_thread.hpp" -#include "dispatcher.hpp" + #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" @@ -34,6 +34,7 @@ #include "owned.hpp" #include "pipe.hpp" #include "err.hpp" +#include "ctx.hpp" #include "platform.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" @@ -456,14 +457,14 @@ int zmq::socket_base_t::close () // Let the thread know that the socket is no longer available. app_thread->remove_socket (this); - // Pointer to the dispatcher must be retrieved before the socket is + // Pointer to the context must be retrieved before the socket is // deallocated. Afterwards it is not available. - dispatcher_t *dispatcher = get_dispatcher (); + ctx_t *ctx = get_ctx (); // Unregister all inproc endpoints associated with this socket. // From this point we are sure that inc_seqnum won't be called again // on this object. - dispatcher->unregister_endpoints (this); + ctx->unregister_endpoints (this); // Wait till all undelivered commands are delivered. This should happen // very quickly. There's no way to wait here for extensive period of time. @@ -503,7 +504,7 @@ int zmq::socket_base_t::close () // This function must be called after the socket is completely deallocated // as it may cause termination of the whole 0MQ infrastructure. - dispatcher->destroy_socket (); + ctx->destroy_socket (); return 0; } diff --git a/src/zmq.cpp b/src/zmq.cpp index e97cb64..ecb3d3d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -29,11 +29,11 @@ #include "streamer.hpp" #include "socket_base.hpp" #include "app_thread.hpp" -#include "dispatcher.hpp" #include "msg_content.hpp" #include "platform.hpp" #include "stdint.hpp" #include "config.hpp" +#include "ctx.hpp" #include "err.hpp" #include "fd.hpp" @@ -263,15 +263,14 @@ void *zmq_init (int /*app_threads_*/, int io_threads_, int /*flags_*/) #endif // Create 0MQ context. - zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t ( - (uint32_t) io_threads_); - zmq_assert (dispatcher); - return (void*) dispatcher; + zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_); + zmq_assert (ctx); + return (void*) ctx; } -int zmq_term (void *dispatcher_) +int zmq_term (void *ctx_) { - int rc = ((zmq::dispatcher_t*) dispatcher_)->term (); + int rc = ((zmq::ctx_t*) ctx_)->term (); int en = errno; #if defined ZMQ_HAVE_OPENPGM @@ -284,9 +283,9 @@ int zmq_term (void *dispatcher_) return rc; } -void *zmq_socket (void *dispatcher_, int type_) +void *zmq_socket (void *ctx_, int type_) { - return (void*) (((zmq::dispatcher_t*) dispatcher_)->create_socket (type_)); + return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_)); } int zmq_close (void *s_) diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index af71229..077286f 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -54,7 +54,7 @@ bool zmq::zmq_encoder_t::message_ready () // Destroy content of the old message. zmq_msg_close(&in_progress); - // Read new message from the dispatcher. If there is none, return false. + // Read new message. If there is none, return false. // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. -- cgit v1.2.3