summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile.am4
-rw-r--r--src/app_thread.cpp10
-rw-r--r--src/app_thread.hpp2
-rw-r--r--src/ctx.cpp (renamed from src/dispatcher.cpp)30
-rw-r--r--src/ctx.hpp (renamed from src/dispatcher.hpp)23
-rw-r--r--src/io_thread.cpp6
-rw-r--r--src/io_thread.hpp2
-rw-r--r--src/object.cpp28
-rw-r--r--src/object.hpp10
-rw-r--r--src/socket_base.cpp11
-rw-r--r--src/zmq.cpp17
-rw-r--r--src/zmq_encoder.cpp2
12 files changed, 74 insertions, 71 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 2cd5ace..70ae248 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -55,9 +55,9 @@ libzmq_la_SOURCES = app_thread.hpp \
blob.hpp \
command.hpp \
config.hpp \
+ ctx.hpp \
decoder.hpp \
devpoll.hpp \
- dispatcher.hpp \
downstream.hpp \
encoder.hpp \
epoll.hpp \
@@ -122,8 +122,8 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_listener.hpp \
app_thread.cpp \
command.cpp \
+ ctx.cpp \
devpoll.cpp \
- dispatcher.cpp \
downstream.cpp \
epoll.cpp \
err.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 1c06337..19f997b 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -34,7 +34,7 @@
#endif
#include "app_thread.hpp"
-#include "dispatcher.hpp"
+#include "ctx.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "config.hpp"
@@ -57,9 +57,9 @@
#define ZMQ_DELAY_COMMANDS
#endif
-zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_,
+zmq::app_thread_t::app_thread_t (ctx_t *ctx_,
uint32_t thread_slot_) :
- object_t (dispatcher_, thread_slot_),
+ object_t (ctx_, thread_slot_),
last_processing_time (0),
terminated (false)
{
@@ -163,7 +163,7 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
break;
default:
if (sockets.empty ())
- get_dispatcher ()->no_sockets (this);
+ get_ctx ()->no_sockets (this);
errno = EINVAL;
return NULL;
}
@@ -178,7 +178,7 @@ void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
{
sockets.erase (socket_);
if (sockets.empty ())
- get_dispatcher ()->no_sockets (this);
+ get_ctx ()->no_sockets (this);
}
void zmq::app_thread_t::process_stop ()
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index bca6947..f0deaab 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -34,7 +34,7 @@ namespace zmq
{
public:
- app_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
+ app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
~app_thread_t ();
diff --git a/src/dispatcher.cpp b/src/ctx.cpp
index 2ae99ba..f0e177d 100644
--- a/src/dispatcher.cpp
+++ b/src/ctx.cpp
@@ -22,7 +22,7 @@
#include "../include/zmq.h"
-#include "dispatcher.hpp"
+#include "ctx.hpp"
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
@@ -34,7 +34,7 @@
#include "windows.h"
#endif
-zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) :
+zmq::ctx_t::ctx_t (uint32_t io_threads_) :
sockets (0),
terminated (false)
{
@@ -65,7 +65,7 @@ zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) :
}
}
-int zmq::dispatcher_t::term ()
+int zmq::ctx_t::term ()
{
// First send stop command to application threads so that any
// blocking calls are interrupted.
@@ -86,7 +86,7 @@ int zmq::dispatcher_t::term ()
return 0;
}
-zmq::dispatcher_t::~dispatcher_t ()
+zmq::ctx_t::~ctx_t ()
{
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
@@ -117,7 +117,7 @@ zmq::dispatcher_t::~dispatcher_t ()
#endif
}
-zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
+zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
app_threads_sync.lock ();
@@ -183,7 +183,7 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
return s;
}
-void zmq::dispatcher_t::destroy_socket ()
+void zmq::ctx_t::destroy_socket ()
{
// If zmq_term was already called and there are no more sockets,
// terminate the whole 0MQ infrastructure.
@@ -197,7 +197,7 @@ void zmq::dispatcher_t::destroy_socket ()
delete this;
}
-void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
+void zmq::ctx_t::no_sockets (app_thread_t *thread_)
{
app_threads_sync.lock ();
app_threads_t::size_type i;
@@ -210,19 +210,19 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
app_threads_sync.unlock ();
}
-void zmq::dispatcher_t::send_command (uint32_t destination_,
+void zmq::ctx_t::send_command (uint32_t destination_,
const command_t &command_)
{
signalers [destination_]->send (command_);
}
-bool zmq::dispatcher_t::recv_command (uint32_t thread_slot_,
+bool zmq::ctx_t::recv_command (uint32_t thread_slot_,
command_t *command_, bool block_)
{
return signalers [thread_slot_]->recv (command_, block_);
}
-zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)
+zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
{
// Find the I/O thread with minimum load.
zmq_assert (io_threads.size () > 0);
@@ -241,7 +241,7 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)
return io_threads [result];
}
-void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_)
+void zmq::ctx_t::register_pipe (class pipe_t *pipe_)
{
pipes_sync.lock ();
bool inserted = pipes.insert (pipe_).second;
@@ -249,7 +249,7 @@ void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_)
pipes_sync.unlock ();
}
-void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
+void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_)
{
pipes_sync.lock ();
pipes_t::size_type erased = pipes.erase (pipe_);
@@ -257,7 +257,7 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
pipes_sync.unlock ();
}
-int zmq::dispatcher_t::register_endpoint (const char *addr_,
+int zmq::ctx_t::register_endpoint (const char *addr_,
socket_base_t *socket_)
{
endpoints_sync.lock ();
@@ -274,7 +274,7 @@ int zmq::dispatcher_t::register_endpoint (const char *addr_,
return 0;
}
-void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)
+void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
{
endpoints_sync.lock ();
@@ -292,7 +292,7 @@ void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)
endpoints_sync.unlock ();
}
-zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_)
+zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
{
endpoints_sync.lock ();
diff --git a/src/dispatcher.hpp b/src/ctx.hpp
index cad4844..c96a923 100644
--- a/src/dispatcher.hpp
+++ b/src/ctx.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__
-#define __ZMQ_DISPATCHER_HPP_INCLUDED__
+#ifndef __ZMQ_CTX_HPP_INCLUDED__
+#define __ZMQ_CTX_HPP_INCLUDED__
#include <vector>
#include <set>
@@ -34,14 +34,17 @@
namespace zmq
{
+
+ // Context object encapsulates all the global state associated with
+ // the library.
- class dispatcher_t
+ class ctx_t
{
public:
- // Create the dispatcher object. The argument specifies the size
+ // Create the context object. The argument specifies the size
// of I/O thread pool to create.
- dispatcher_t (uint32_t io_threads_);
+ ctx_t (uint32_t io_threads_);
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
@@ -70,7 +73,7 @@ namespace zmq
// Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_);
- // All pipes are registered with the dispatcher so that even the
+ // All pipes are registered with the context so that even the
// orphaned pipes can be deallocated on the terminal shutdown.
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
@@ -82,7 +85,7 @@ namespace zmq
private:
- ~dispatcher_t ();
+ ~ctx_t ();
struct app_thread_info_t
{
@@ -116,7 +119,7 @@ namespace zmq
// As pipes may reside in orphaned state in particular moments
// of the pipe shutdown process, i.e. neither pipe reader nor
// pipe writer hold reference to the pipe, we have to hold references
- // to all pipes in dispatcher so that we can deallocate them
+ // to all pipes in context so that we can deallocate them
// during terminal shutdown even though it conincides with the
// pipe being in the orphaned state.
typedef std::set <class pipe_t*> pipes_t;
@@ -143,8 +146,8 @@ namespace zmq
// Synchronisation of access to the list of inproc endpoints.
mutex_t endpoints_sync;
- dispatcher_t (const dispatcher_t&);
- void operator = (const dispatcher_t&);
+ ctx_t (const ctx_t&);
+ void operator = (const ctx_t&);
};
}
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 92c314a..fac6961 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -24,11 +24,11 @@
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
-#include "dispatcher.hpp"
+#include "ctx.hpp"
-zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_,
+zmq::io_thread_t::io_thread_t (ctx_t *ctx_,
uint32_t thread_slot_) :
- object_t (dispatcher_, thread_slot_)
+ object_t (ctx_, thread_slot_)
{
poller = new (std::nothrow) poller_t;
zmq_assert (poller);
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index 7e105b3..3d832c0 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -38,7 +38,7 @@ namespace zmq
{
public:
- io_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
+ io_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
diff --git a/src/object.cpp b/src/object.cpp
index c5c89cb..324450f 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -20,7 +20,7 @@
#include <string.h>
#include "object.hpp"
-#include "dispatcher.hpp"
+#include "ctx.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "io_thread.hpp"
@@ -28,14 +28,14 @@
#include "session.hpp"
#include "socket_base.hpp"
-zmq::object_t::object_t (dispatcher_t *dispatcher_, uint32_t thread_slot_) :
- dispatcher (dispatcher_),
+zmq::object_t::object_t (ctx_t *ctx_, uint32_t thread_slot_) :
+ ctx (ctx_),
thread_slot (thread_slot_)
{
}
zmq::object_t::object_t (object_t *parent_) :
- dispatcher (parent_->dispatcher),
+ ctx (parent_->ctx),
thread_slot (parent_->thread_slot)
{
}
@@ -49,9 +49,9 @@ uint32_t zmq::object_t::get_thread_slot ()
return thread_slot;
}
-zmq::dispatcher_t *zmq::object_t::get_dispatcher ()
+zmq::ctx_t *zmq::object_t::get_ctx ()
{
- return dispatcher;
+ return ctx;
}
void zmq::object_t::process_command (command_t &cmd_)
@@ -125,32 +125,32 @@ void zmq::object_t::process_command (command_t &cmd_)
void zmq::object_t::register_pipe (class pipe_t *pipe_)
{
- dispatcher->register_pipe (pipe_);
+ ctx->register_pipe (pipe_);
}
void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
{
- dispatcher->unregister_pipe (pipe_);
+ ctx->unregister_pipe (pipe_);
}
int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)
{
- return dispatcher->register_endpoint (addr_, socket_);
+ return ctx->register_endpoint (addr_, socket_);
}
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
- return dispatcher->unregister_endpoints (socket_);
+ return ctx->unregister_endpoints (socket_);
}
zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)
{
- return dispatcher->find_endpoint (addr_);
+ return ctx->find_endpoint (addr_);
}
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
- return dispatcher->choose_io_thread (taskset_);
+ return ctx->choose_io_thread (taskset_);
}
void zmq::object_t::send_stop ()
@@ -160,7 +160,7 @@ void zmq::object_t::send_stop ()
command_t cmd;
cmd.destination = this;
cmd.type = command_t::stop;
- dispatcher->send_command (thread_slot, cmd);
+ ctx->send_command (thread_slot, cmd);
}
void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
@@ -369,6 +369,6 @@ void zmq::object_t::process_seqnum ()
void zmq::object_t::send_command (command_t &cmd_)
{
- dispatcher->send_command (cmd_.destination->get_thread_slot (), cmd_);
+ ctx->send_command (cmd_.destination->get_thread_slot (), cmd_);
}
diff --git a/src/object.hpp b/src/object.hpp
index 0084e1a..a38b0a6 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -32,15 +32,15 @@ namespace zmq
{
public:
- object_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
+ object_t (class ctx_t *ctx_, uint32_t thread_slot_);
object_t (object_t *parent_);
virtual ~object_t ();
uint32_t get_thread_slot ();
- dispatcher_t *get_dispatcher ();
+ ctx_t *get_ctx ();
void process_command (struct command_t &cmd_);
- // Allow pipe to access corresponding dispatcher functions.
+ // Allow pipe to access corresponding context functions.
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
@@ -101,8 +101,8 @@ namespace zmq
private:
- // Pointer to the root of the infrastructure.
- class dispatcher_t *dispatcher;
+ // Context provides access to the global state.
+ class ctx_t *ctx;
// Slot ID of the thread the object belongs to.
uint32_t thread_slot;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index b186683..e946526 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -25,7 +25,7 @@
#include "socket_base.hpp"
#include "app_thread.hpp"
-#include "dispatcher.hpp"
+
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
@@ -34,6 +34,7 @@
#include "owned.hpp"
#include "pipe.hpp"
#include "err.hpp"
+#include "ctx.hpp"
#include "platform.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
@@ -456,14 +457,14 @@ int zmq::socket_base_t::close ()
// Let the thread know that the socket is no longer available.
app_thread->remove_socket (this);
- // Pointer to the dispatcher must be retrieved before the socket is
+ // Pointer to the context must be retrieved before the socket is
// deallocated. Afterwards it is not available.
- dispatcher_t *dispatcher = get_dispatcher ();
+ ctx_t *ctx = get_ctx ();
// Unregister all inproc endpoints associated with this socket.
// From this point we are sure that inc_seqnum won't be called again
// on this object.
- dispatcher->unregister_endpoints (this);
+ ctx->unregister_endpoints (this);
// Wait till all undelivered commands are delivered. This should happen
// very quickly. There's no way to wait here for extensive period of time.
@@ -503,7 +504,7 @@ int zmq::socket_base_t::close ()
// This function must be called after the socket is completely deallocated
// as it may cause termination of the whole 0MQ infrastructure.
- dispatcher->destroy_socket ();
+ ctx->destroy_socket ();
return 0;
}
diff --git a/src/zmq.cpp b/src/zmq.cpp
index e97cb64..ecb3d3d 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -29,11 +29,11 @@
#include "streamer.hpp"
#include "socket_base.hpp"
#include "app_thread.hpp"
-#include "dispatcher.hpp"
#include "msg_content.hpp"
#include "platform.hpp"
#include "stdint.hpp"
#include "config.hpp"
+#include "ctx.hpp"
#include "err.hpp"
#include "fd.hpp"
@@ -263,15 +263,14 @@ void *zmq_init (int /*app_threads_*/, int io_threads_, int /*flags_*/)
#endif
// Create 0MQ context.
- zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
- (uint32_t) io_threads_);
- zmq_assert (dispatcher);
- return (void*) dispatcher;
+ zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
+ zmq_assert (ctx);
+ return (void*) ctx;
}
-int zmq_term (void *dispatcher_)
+int zmq_term (void *ctx_)
{
- int rc = ((zmq::dispatcher_t*) dispatcher_)->term ();
+ int rc = ((zmq::ctx_t*) ctx_)->term ();
int en = errno;
#if defined ZMQ_HAVE_OPENPGM
@@ -284,9 +283,9 @@ int zmq_term (void *dispatcher_)
return rc;
}
-void *zmq_socket (void *dispatcher_, int type_)
+void *zmq_socket (void *ctx_, int type_)
{
- return (void*) (((zmq::dispatcher_t*) dispatcher_)->create_socket (type_));
+ return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));
}
int zmq_close (void *s_)
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index af71229..077286f 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -54,7 +54,7 @@ bool zmq::zmq_encoder_t::message_ready ()
// Destroy content of the old message.
zmq_msg_close(&in_progress);
- // Read new message from the dispatcher. If there is none, return false.
+ // Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.