diff options
| author | Martin Sustrik <sustrik@250bpm.com> | 2010-05-05 14:24:54 +0200 | 
|---|---|---|
| committer | Martin Sustrik <sustrik@250bpm.com> | 2010-05-05 14:24:54 +0200 | 
| commit | 835e893e54598ff474067cc68b787440baf6b05c (patch) | |
| tree | b529fbc8fea71f01a38064a65722924cdcbc4ed4 /src | |
| parent | 10f5334f2891b187ce57f38186cf977406097ab0 (diff) | |
dispatcher_t class renamed to ctx_t
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 4 | ||||
| -rw-r--r-- | src/app_thread.cpp | 10 | ||||
| -rw-r--r-- | src/app_thread.hpp | 2 | ||||
| -rw-r--r-- | src/ctx.cpp (renamed from src/dispatcher.cpp) | 30 | ||||
| -rw-r--r-- | src/ctx.hpp (renamed from src/dispatcher.hpp) | 23 | ||||
| -rw-r--r-- | src/io_thread.cpp | 6 | ||||
| -rw-r--r-- | src/io_thread.hpp | 2 | ||||
| -rw-r--r-- | src/object.cpp | 28 | ||||
| -rw-r--r-- | src/object.hpp | 10 | ||||
| -rw-r--r-- | src/socket_base.cpp | 11 | ||||
| -rw-r--r-- | src/zmq.cpp | 17 | ||||
| -rw-r--r-- | src/zmq_encoder.cpp | 2 | 
12 files changed, 74 insertions, 71 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 2cd5ace..70ae248 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -55,9 +55,9 @@ libzmq_la_SOURCES = app_thread.hpp \      blob.hpp \      command.hpp \      config.hpp \ +    ctx.hpp \      decoder.hpp \      devpoll.hpp \ -    dispatcher.hpp \      downstream.hpp \      encoder.hpp \      epoll.hpp \ @@ -122,8 +122,8 @@ libzmq_la_SOURCES = app_thread.hpp \      zmq_listener.hpp \      app_thread.cpp \      command.cpp \ +    ctx.cpp \      devpoll.cpp \ -    dispatcher.cpp \      downstream.cpp \      epoll.cpp \      err.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 1c06337..19f997b 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -34,7 +34,7 @@  #endif  #include "app_thread.hpp" -#include "dispatcher.hpp" +#include "ctx.hpp"  #include "err.hpp"  #include "pipe.hpp"  #include "config.hpp" @@ -57,9 +57,9 @@  #define ZMQ_DELAY_COMMANDS  #endif -zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, +zmq::app_thread_t::app_thread_t (ctx_t *ctx_,          uint32_t thread_slot_) : -    object_t (dispatcher_, thread_slot_), +    object_t (ctx_, thread_slot_),      last_processing_time (0),      terminated (false)  { @@ -163,7 +163,7 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)          break;      default:          if (sockets.empty ()) -            get_dispatcher ()->no_sockets (this); +            get_ctx ()->no_sockets (this);          errno = EINVAL;          return NULL;      } @@ -178,7 +178,7 @@ void zmq::app_thread_t::remove_socket (socket_base_t *socket_)  {      sockets.erase (socket_);      if (sockets.empty ()) -        get_dispatcher ()->no_sockets (this); +        get_ctx ()->no_sockets (this);  }  void zmq::app_thread_t::process_stop () diff --git a/src/app_thread.hpp b/src/app_thread.hpp index bca6947..f0deaab 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -34,7 +34,7 @@ namespace zmq      {      public: -        app_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); +        app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);          ~app_thread_t (); diff --git a/src/dispatcher.cpp b/src/ctx.cpp index 2ae99ba..f0e177d 100644 --- a/src/dispatcher.cpp +++ b/src/ctx.cpp @@ -22,7 +22,7 @@  #include "../include/zmq.h" -#include "dispatcher.hpp" +#include "ctx.hpp"  #include "socket_base.hpp"  #include "app_thread.hpp"  #include "io_thread.hpp" @@ -34,7 +34,7 @@  #include "windows.h"  #endif -zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) : +zmq::ctx_t::ctx_t (uint32_t io_threads_) :      sockets (0),      terminated (false)  { @@ -65,7 +65,7 @@ zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) :      }  } -int zmq::dispatcher_t::term () +int zmq::ctx_t::term ()  {      //  First send stop command to application threads so that any      //  blocking calls are interrupted. @@ -86,7 +86,7 @@ int zmq::dispatcher_t::term ()      return 0;  } -zmq::dispatcher_t::~dispatcher_t () +zmq::ctx_t::~ctx_t ()  {      //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O      //  thread subsequent invocation of destructor would hang-up. @@ -117,7 +117,7 @@ zmq::dispatcher_t::~dispatcher_t ()  #endif  } -zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) +zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)  {      app_threads_sync.lock (); @@ -183,7 +183,7 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)      return s;  } -void zmq::dispatcher_t::destroy_socket () +void zmq::ctx_t::destroy_socket ()  {      //  If zmq_term was already called and there are no more sockets,      //  terminate the whole 0MQ infrastructure. @@ -197,7 +197,7 @@ void zmq::dispatcher_t::destroy_socket ()         delete this;  } -void zmq::dispatcher_t::no_sockets (app_thread_t *thread_) +void zmq::ctx_t::no_sockets (app_thread_t *thread_)  {      app_threads_sync.lock ();      app_threads_t::size_type i; @@ -210,19 +210,19 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)      app_threads_sync.unlock ();  } -void zmq::dispatcher_t::send_command (uint32_t destination_, +void zmq::ctx_t::send_command (uint32_t destination_,      const command_t &command_)  {      signalers [destination_]->send (command_);  } -bool zmq::dispatcher_t::recv_command (uint32_t thread_slot_, +bool zmq::ctx_t::recv_command (uint32_t thread_slot_,      command_t *command_, bool block_)  {      return signalers [thread_slot_]->recv (command_, block_);  } -zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_) +zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)  {      //  Find the I/O thread with minimum load.      zmq_assert (io_threads.size () > 0); @@ -241,7 +241,7 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)      return io_threads [result];  } -void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_) +void zmq::ctx_t::register_pipe (class pipe_t *pipe_)  {      pipes_sync.lock ();      bool inserted = pipes.insert (pipe_).second; @@ -249,7 +249,7 @@ void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_)      pipes_sync.unlock ();  } -void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) +void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_)  {      pipes_sync.lock ();      pipes_t::size_type erased = pipes.erase (pipe_); @@ -257,7 +257,7 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)      pipes_sync.unlock ();  } -int zmq::dispatcher_t::register_endpoint (const char *addr_, +int zmq::ctx_t::register_endpoint (const char *addr_,      socket_base_t *socket_)  {      endpoints_sync.lock (); @@ -274,7 +274,7 @@ int zmq::dispatcher_t::register_endpoint (const char *addr_,      return 0;  } -void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_) +void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)  {      endpoints_sync.lock (); @@ -292,7 +292,7 @@ void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)      endpoints_sync.unlock ();  } -zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_) +zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)  {       endpoints_sync.lock (); diff --git a/src/dispatcher.hpp b/src/ctx.hpp index cad4844..c96a923 100644 --- a/src/dispatcher.hpp +++ b/src/ctx.hpp @@ -17,8 +17,8 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ -#define __ZMQ_DISPATCHER_HPP_INCLUDED__ +#ifndef __ZMQ_CTX_HPP_INCLUDED__ +#define __ZMQ_CTX_HPP_INCLUDED__  #include <vector>  #include <set> @@ -34,14 +34,17 @@  namespace zmq  { + +    //  Context object encapsulates all the global state associated with +    //  the library. -    class dispatcher_t +    class ctx_t      {      public: -        //  Create the dispatcher object. The argument specifies the size +        //  Create the context object. The argument specifies the size          //  of I/O thread pool to create. -        dispatcher_t (uint32_t io_threads_); +        ctx_t (uint32_t io_threads_);          //  This function is called when user invokes zmq_term. If there are          //  no more sockets open it'll cause all the infrastructure to be shut @@ -70,7 +73,7 @@ namespace zmq          //  Taskset specifies which I/O threads are eligible (0 = all).          class io_thread_t *choose_io_thread (uint64_t taskset_); -        //  All pipes are registered with the dispatcher so that even the +        //  All pipes are registered with the context so that even the          //  orphaned pipes can be deallocated on the terminal shutdown.          void register_pipe (class pipe_t *pipe_);          void unregister_pipe (class pipe_t *pipe_); @@ -82,7 +85,7 @@ namespace zmq      private: -        ~dispatcher_t (); +        ~ctx_t ();          struct app_thread_info_t          { @@ -116,7 +119,7 @@ namespace zmq          //  As pipes may reside in orphaned state in particular moments          //  of the pipe shutdown process, i.e. neither pipe reader nor          //  pipe writer hold reference to the pipe, we have to hold references -        //  to all pipes in dispatcher so that we can deallocate them +        //  to all pipes in context so that we can deallocate them          //  during terminal shutdown even though it conincides with the          //  pipe being in the orphaned state.          typedef std::set <class pipe_t*> pipes_t; @@ -143,8 +146,8 @@ namespace zmq          //  Synchronisation of access to the list of inproc endpoints.          mutex_t endpoints_sync; -        dispatcher_t (const dispatcher_t&); -        void operator = (const dispatcher_t&); +        ctx_t (const ctx_t&); +        void operator = (const ctx_t&);      };  } diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 92c314a..fac6961 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -24,11 +24,11 @@  #include "io_thread.hpp"  #include "platform.hpp"  #include "err.hpp" -#include "dispatcher.hpp" +#include "ctx.hpp" -zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, +zmq::io_thread_t::io_thread_t (ctx_t *ctx_,        uint32_t thread_slot_) : -    object_t (dispatcher_, thread_slot_) +    object_t (ctx_, thread_slot_)  {      poller = new (std::nothrow) poller_t;      zmq_assert (poller); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 7e105b3..3d832c0 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -38,7 +38,7 @@ namespace zmq      {      public: -        io_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); +        io_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);          //  Clean-up. If the thread was started, it's neccessary to call 'stop'          //  before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/object.cpp b/src/object.cpp index c5c89cb..324450f 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -20,7 +20,7 @@  #include <string.h>  #include "object.hpp" -#include "dispatcher.hpp" +#include "ctx.hpp"  #include "err.hpp"  #include "pipe.hpp"  #include "io_thread.hpp" @@ -28,14 +28,14 @@  #include "session.hpp"  #include "socket_base.hpp" -zmq::object_t::object_t (dispatcher_t *dispatcher_, uint32_t thread_slot_) : -    dispatcher (dispatcher_), +zmq::object_t::object_t (ctx_t *ctx_, uint32_t thread_slot_) : +    ctx (ctx_),      thread_slot (thread_slot_)  {  }  zmq::object_t::object_t (object_t *parent_) : -    dispatcher (parent_->dispatcher), +    ctx (parent_->ctx),      thread_slot (parent_->thread_slot)  {  } @@ -49,9 +49,9 @@ uint32_t zmq::object_t::get_thread_slot ()      return thread_slot;  } -zmq::dispatcher_t *zmq::object_t::get_dispatcher () +zmq::ctx_t *zmq::object_t::get_ctx ()  { -    return dispatcher; +    return ctx;  }  void zmq::object_t::process_command (command_t &cmd_) @@ -125,32 +125,32 @@ void zmq::object_t::process_command (command_t &cmd_)  void zmq::object_t::register_pipe (class pipe_t *pipe_)  { -    dispatcher->register_pipe (pipe_); +    ctx->register_pipe (pipe_);  }  void zmq::object_t::unregister_pipe (class pipe_t *pipe_)  { -    dispatcher->unregister_pipe (pipe_); +    ctx->unregister_pipe (pipe_);  }  int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)  { -    return dispatcher->register_endpoint (addr_, socket_); +    return ctx->register_endpoint (addr_, socket_);  }  void zmq::object_t::unregister_endpoints (socket_base_t *socket_)  { -    return dispatcher->unregister_endpoints (socket_); +    return ctx->unregister_endpoints (socket_);  }  zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)  { -    return dispatcher->find_endpoint (addr_); +    return ctx->find_endpoint (addr_);  }  zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)  { -    return dispatcher->choose_io_thread (taskset_); +    return ctx->choose_io_thread (taskset_);  }  void zmq::object_t::send_stop () @@ -160,7 +160,7 @@ void zmq::object_t::send_stop ()      command_t cmd;      cmd.destination = this;      cmd.type = command_t::stop; -    dispatcher->send_command (thread_slot, cmd); +    ctx->send_command (thread_slot, cmd);  }  void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) @@ -369,6 +369,6 @@ void zmq::object_t::process_seqnum ()  void zmq::object_t::send_command (command_t &cmd_)  { -    dispatcher->send_command (cmd_.destination->get_thread_slot (), cmd_); +    ctx->send_command (cmd_.destination->get_thread_slot (), cmd_);  } diff --git a/src/object.hpp b/src/object.hpp index 0084e1a..a38b0a6 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,15 +32,15 @@ namespace zmq      {      public: -        object_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_); +        object_t (class ctx_t *ctx_, uint32_t thread_slot_);          object_t (object_t *parent_);          virtual ~object_t ();          uint32_t get_thread_slot (); -        dispatcher_t *get_dispatcher (); +        ctx_t *get_ctx ();          void process_command (struct command_t &cmd_); -        //  Allow pipe to access corresponding dispatcher functions. +        //  Allow pipe to access corresponding context functions.          void register_pipe (class pipe_t *pipe_);          void unregister_pipe (class pipe_t *pipe_); @@ -101,8 +101,8 @@ namespace zmq      private: -        //  Pointer to the root of the infrastructure. -        class dispatcher_t *dispatcher; +        //  Context provides access to the global state. +        class ctx_t *ctx;          //  Slot ID of the thread the object belongs to.          uint32_t thread_slot; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b186683..e946526 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -25,7 +25,7 @@  #include "socket_base.hpp"  #include "app_thread.hpp" -#include "dispatcher.hpp" +  #include "zmq_listener.hpp"  #include "zmq_connecter.hpp"  #include "io_thread.hpp" @@ -34,6 +34,7 @@  #include "owned.hpp"  #include "pipe.hpp"  #include "err.hpp" +#include "ctx.hpp"  #include "platform.hpp"  #include "pgm_sender.hpp"  #include "pgm_receiver.hpp" @@ -456,14 +457,14 @@ int zmq::socket_base_t::close ()      //  Let the thread know that the socket is no longer available.      app_thread->remove_socket (this); -    //  Pointer to the dispatcher must be retrieved before the socket is +    //  Pointer to the context must be retrieved before the socket is      //  deallocated. Afterwards it is not available. -    dispatcher_t *dispatcher = get_dispatcher (); +    ctx_t *ctx = get_ctx ();      //  Unregister all inproc endpoints associated with this socket.      //  From this point we are sure that inc_seqnum won't be called again      //  on this object. -    dispatcher->unregister_endpoints (this); +    ctx->unregister_endpoints (this);      //  Wait till all undelivered commands are delivered. This should happen      //  very quickly. There's no way to wait here for extensive period of time. @@ -503,7 +504,7 @@ int zmq::socket_base_t::close ()      //  This function must be called after the socket is completely deallocated      //  as it may cause termination of the whole 0MQ infrastructure. -    dispatcher->destroy_socket (); +    ctx->destroy_socket ();      return 0;  } diff --git a/src/zmq.cpp b/src/zmq.cpp index e97cb64..ecb3d3d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -29,11 +29,11 @@  #include "streamer.hpp"  #include "socket_base.hpp"  #include "app_thread.hpp" -#include "dispatcher.hpp"  #include "msg_content.hpp"  #include "platform.hpp"  #include "stdint.hpp"  #include "config.hpp" +#include "ctx.hpp"  #include "err.hpp"  #include "fd.hpp" @@ -263,15 +263,14 @@ void *zmq_init (int /*app_threads_*/, int io_threads_, int /*flags_*/)  #endif      //  Create 0MQ context. -    zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t ( -        (uint32_t) io_threads_); -    zmq_assert (dispatcher); -    return (void*) dispatcher; +    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_); +    zmq_assert (ctx); +    return (void*) ctx;  } -int zmq_term (void *dispatcher_) +int zmq_term (void *ctx_)  { -    int rc = ((zmq::dispatcher_t*) dispatcher_)->term (); +    int rc = ((zmq::ctx_t*) ctx_)->term ();      int en = errno;  #if defined ZMQ_HAVE_OPENPGM @@ -284,9 +283,9 @@ int zmq_term (void *dispatcher_)      return rc;  } -void *zmq_socket (void *dispatcher_, int type_) +void *zmq_socket (void *ctx_, int type_)  { -    return (void*) (((zmq::dispatcher_t*) dispatcher_)->create_socket (type_)); +    return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));  }  int zmq_close (void *s_) diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index af71229..077286f 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -54,7 +54,7 @@ bool zmq::zmq_encoder_t::message_ready ()      //  Destroy content of the old message.      zmq_msg_close(&in_progress); -    //  Read new message from the dispatcher. If there is none, return false. +    //  Read new message. If there is none, return false.      //  Note that new state is set only if write is successful. That way      //  unsuccessful write will cause retry on the next state machine      //  invocation. | 
