summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-06 10:47:34 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-06 10:47:34 +0200
commitb8b4acef4c2ba1a169ce84c1fb4c70a5676ebba3 (patch)
treead29407ed2db35765fda969e297f7f0efd9ff39a
parent43fa72b7ee6b6d97b84a555ce8902cee855aeb72 (diff)
dispatcher renamed to context
-rw-r--r--src/Makefile.am4
-rw-r--r--src/app_thread.cpp8
-rw-r--r--src/app_thread.hpp4
-rw-r--r--src/context.cpp (renamed from src/dispatcher.cpp)28
-rw-r--r--src/context.hpp (renamed from src/dispatcher.hpp)24
-rw-r--r--src/io_thread.cpp8
-rw-r--r--src/io_thread.hpp2
-rw-r--r--src/object.cpp26
-rw-r--r--src/object.hpp6
-rw-r--r--src/pipe.hpp8
-rw-r--r--src/pipe_reader.cpp2
-rw-r--r--src/pipe_reader.hpp6
-rw-r--r--src/pipe_writer.hpp6
-rw-r--r--src/safe_object.cpp6
-rw-r--r--src/safe_object.hpp2
-rw-r--r--src/zmq.cpp15
16 files changed, 77 insertions, 78 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index e6d09ca..27f4412 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -8,10 +8,10 @@ libzmq_la_SOURCES = \
command.hpp \
config.hpp \
connecter.hpp \
+ context.hpp \
data_distributor.hpp \
decoder.hpp \
devpoll.hpp \
- dispatcher.hpp \
dummy_aggregator.hpp \
dummy_distributor.hpp \
encoder.hpp \
@@ -70,9 +70,9 @@ libzmq_la_SOURCES = \
zmq_tcp_engine.hpp \
app_thread.cpp \
connecter.cpp \
+ context.cpp \
data_distributor.cpp \
devpoll.hpp \
- dispatcher.cpp \
dummy_aggregator.cpp \
dummy_distributor.cpp \
epoll.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 2406dbd..9cc61c7 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -26,7 +26,7 @@
#endif
#include "app_thread.hpp"
-#include "dispatcher.hpp"
+#include "context.hpp"
#include "err.hpp"
#include "session.hpp"
#include "pipe.hpp"
@@ -51,8 +51,8 @@
#define ZMQ_DELAY_COMMANDS
#endif
-zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
- object_t (dispatcher_, thread_slot_),
+zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
+ object_t (context_, thread_slot_),
tid (0),
last_processing_time (0)
{
@@ -213,7 +213,7 @@ void zmq::app_thread_t::process_commands (bool block_)
for (int i = 0; i != thread_slot_count (); i++) {
if (signals & (ypollset_t::signals_t (1) << i)) {
command_t cmd;
- while (dispatcher->read (i, get_thread_slot (), &cmd))
+ while (context->read (i, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd);
}
}
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index ffe5596..8295c2f 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -34,7 +34,7 @@ namespace zmq
{
public:
- app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
+ app_thread_t (class context_t *context_, int thread_slot_);
// To be called when the whole infrastrucure is being closed.
void shutdown ();
@@ -47,7 +47,7 @@ namespace zmq
struct i_api *create_socket (int type_);
// Nota bene: The following two functions are accessed from different
- // threads. The caller (dispatcher) is responsible for synchronisation
+ // threads. The caller (context) is responsible for synchronisation
// of accesses.
// Returns true is current thread is associated with the app thread.
diff --git a/src/dispatcher.cpp b/src/context.cpp
index 56a5e0b..ab4643e 100644
--- a/src/dispatcher.cpp
+++ b/src/context.cpp
@@ -19,7 +19,7 @@
#include "../include/zmq.h"
-#include "dispatcher.hpp"
+#include "context.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
@@ -34,7 +34,7 @@
#include "windows.h"
#endif
-zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
+zmq::context_t::context_t (int app_threads_, int io_threads_)
{
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
@@ -72,12 +72,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
io_threads [i]->start ();
}
-void zmq::dispatcher_t::shutdown ()
+void zmq::context_t::shutdown ()
{
delete this;
}
-zmq::dispatcher_t::~dispatcher_t ()
+zmq::context_t::~context_t ()
{
// Ask I/O threads to terminate.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
@@ -110,12 +110,12 @@ zmq::dispatcher_t::~dispatcher_t ()
#endif
}
-int zmq::dispatcher_t::thread_slot_count ()
+int zmq::context_t::thread_slot_count ()
{
return signalers.size ();
}
-zmq::i_api *zmq::dispatcher_t::create_socket (int type_)
+zmq::i_api *zmq::context_t::create_socket (int type_)
{
threads_sync.lock ();
app_thread_t *thread = choose_app_thread ();
@@ -128,14 +128,14 @@ zmq::i_api *zmq::dispatcher_t::create_socket (int type_)
return s;
}
-zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
+zmq::app_thread_t *zmq::context_t::choose_app_thread ()
{
// Check whether thread ID is already assigned. If so, return it.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
if (app_threads [i]->is_current ())
return app_threads [i];
- // Check whether there's an unused thread slot in the dispatcher.
+ // Check whether there's an unused thread slot in the cotext.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
if (app_threads [i]->make_current ())
return app_threads [i];
@@ -145,7 +145,7 @@ zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
return NULL;
}
-zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_)
+zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_)
{
zmq_assert (io_threads.size () > 0);
@@ -165,7 +165,7 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_)
return io_threads [result];
}
-void zmq::dispatcher_t::create_pipe (object_t *reader_parent_,
+void zmq::context_t::create_pipe (object_t *reader_parent_,
object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
pipe_reader_t **reader_, pipe_writer_t **writer_)
{
@@ -191,7 +191,7 @@ void zmq::dispatcher_t::create_pipe (object_t *reader_parent_,
*writer_ = writer;
}
-void zmq::dispatcher_t::destroy_pipe (pipe_t *pipe_)
+void zmq::context_t::destroy_pipe (pipe_t *pipe_)
{
// Remove the pipe from the repository.
pipe_info_t info;
@@ -209,7 +209,7 @@ void zmq::dispatcher_t::destroy_pipe (pipe_t *pipe_)
delete info.writer;
}
-int zmq::dispatcher_t::register_inproc_endpoint (const char *endpoint_,
+int zmq::context_t::register_inproc_endpoint (const char *endpoint_,
session_t *session_)
{
inproc_endpoint_sync.lock ();
@@ -227,7 +227,7 @@ int zmq::dispatcher_t::register_inproc_endpoint (const char *endpoint_,
return 0;
}
-zmq::object_t *zmq::dispatcher_t::get_inproc_endpoint (const char *endpoint_)
+zmq::object_t *zmq::context_t::get_inproc_endpoint (const char *endpoint_)
{
inproc_endpoint_sync.lock ();
inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_);
@@ -245,7 +245,7 @@ zmq::object_t *zmq::dispatcher_t::get_inproc_endpoint (const char *endpoint_)
return session;
}
-void zmq::dispatcher_t::unregister_inproc_endpoints (session_t *session_)
+void zmq::context_t::unregister_inproc_endpoints (session_t *session_)
{
inproc_endpoint_sync.lock ();
diff --git a/src/dispatcher.hpp b/src/context.hpp
index 07c35cd..7701ef7 100644
--- a/src/dispatcher.hpp
+++ b/src/context.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__
-#define __ZMQ_DISPATCHER_HPP_INCLUDED__
+#ifndef __ZMQ_CONTEXT_HPP_INCLUDED__
+#define __ZMQ_CONTEXT_HPP_INCLUDED__
#include <vector>
#include <map>
@@ -37,19 +37,19 @@ namespace zmq
// Dispatcher implements bidirectional thread-safe passing of commands
// between N threads. It consists of a ypipes to pass commands and
// signalers to wake up the receiver thread when new commands are
- // available. Note that dispatcher is inefficient for passing messages
+ // available. Note that context is inefficient for passing messages
// within a thread (sender thread = receiver thread). The optimisation is
// not part of the class and should be implemented by individual threads
// (presumably by calling the command handling function directly).
- class dispatcher_t
+ class context_t
{
public:
- // Create the dispatcher object. Matrix of pipes to communicate between
+ // Create the context object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate
// signalers.
- dispatcher_t (int app_threads_, int io_threads_);
+ context_t (int app_threads_, int io_threads_);
// To be called to terminate the whole infrastructure (zmq_term).
void shutdown ();
@@ -57,12 +57,12 @@ namespace zmq
// Create a socket engine.
struct i_api *create_socket (int type_);
- // Returns number of thread slots in the dispatcher. To be used by
+ // Returns number of thread slots in the context. To be used by
// individual threads to find out how many distinct signals can be
// received.
int thread_slot_count ();
- // Write command to the dispatcher.
+ // Send command from the source to the destination.
inline void write (int source_, int destination_,
const command_t &command_)
{
@@ -73,7 +73,7 @@ namespace zmq
signalers [destination_]->signal (source_);
}
- // Read command from the dispatcher. Returns false if there is no
+ // Receive command from the source. Returns false if there is no
// command available.
inline bool read (int source_, int destination_, command_t *command_)
{
@@ -110,7 +110,7 @@ namespace zmq
private:
// Clean-up.
- ~dispatcher_t ();
+ ~context_t ();
// Returns the app thread associated with the current thread.
// NULL if we are out of app thread slots.
@@ -160,8 +160,8 @@ namespace zmq
// of inproc endpoints.
mutex_t inproc_endpoint_sync;
- dispatcher_t (const dispatcher_t&);
- void operator = (const dispatcher_t&);
+ context_t (const context_t&);
+ void operator = (const context_t&);
};
}
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 045627c..162ed4c 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -29,13 +29,13 @@
#include "select.hpp"
#include "devpoll.hpp"
#include "kqueue.hpp"
-#include "dispatcher.hpp"
+#include "context.hpp"
#include "session.hpp"
#include "simple_semaphore.hpp"
#include "session.hpp"
-zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
- object_t (dispatcher_, thread_slot_)
+zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) :
+ object_t (context_, thread_slot_)
{
#if defined ZMQ_FORCE_SELECT
poller = new select_t;
@@ -131,7 +131,7 @@ void zmq::io_thread_t::in_event ()
// Read all the commands from particular thread.
command_t cmd;
- while (dispatcher->read (source_thread_slot, thread_slot, &cmd))
+ while (context->read (source_thread_slot, thread_slot, &cmd))
cmd.destination->process_command (cmd);
}
}
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index afb8110..585a28b 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -38,7 +38,7 @@ namespace zmq
{
public:
- io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
+ io_thread_t (class context_t *context_, int thread_slot_);
// Launch the physical thread.
void start ();
diff --git a/src/object.cpp b/src/object.cpp
index a9370ab..7c85212 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -18,7 +18,7 @@
*/
#include "object.hpp"
-#include "dispatcher.hpp"
+#include "context.hpp"
#include "err.hpp"
#include "pipe_reader.hpp"
#include "pipe_writer.hpp"
@@ -27,14 +27,14 @@
#include "simple_semaphore.hpp"
#include "i_engine.hpp"
-zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
- dispatcher (dispatcher_),
+zmq::object_t::object_t (context_t *context_, int thread_slot_) :
+ context (context_),
thread_slot (thread_slot_)
{
}
zmq::object_t::object_t (object_t *parent_) :
- dispatcher (parent_->dispatcher),
+ context (parent_->context),
thread_slot (parent_->thread_slot)
{
}
@@ -45,7 +45,7 @@ zmq::object_t::~object_t ()
int zmq::object_t::thread_slot_count ()
{
- return dispatcher->thread_slot_count ();
+ return context->thread_slot_count ();
}
int zmq::object_t::get_thread_slot ()
@@ -107,34 +107,34 @@ void zmq::object_t::create_pipe (object_t *reader_parent_,
object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
pipe_reader_t **reader_, pipe_writer_t **writer_)
{
- dispatcher->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_,
+ context->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_,
reader_, writer_);
}
void zmq::object_t::destroy_pipe (pipe_t *pipe_)
{
- dispatcher->destroy_pipe (pipe_);
+ context->destroy_pipe (pipe_);
}
int zmq::object_t::register_inproc_endpoint (const char *endpoint_,
session_t *session_)
{
- return dispatcher->register_inproc_endpoint (endpoint_, session_);
+ return context->register_inproc_endpoint (endpoint_, session_);
}
zmq::object_t *zmq::object_t::get_inproc_endpoint (const char *endpoint_)
{
- return dispatcher->get_inproc_endpoint (endpoint_);
+ return context->get_inproc_endpoint (endpoint_);
}
void zmq::object_t::unregister_inproc_endpoints (session_t *session_)
{
- dispatcher->unregister_inproc_endpoints (session_);
+ context->unregister_inproc_endpoints (session_);
}
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
- return dispatcher->choose_io_thread (taskset_);
+ return context->choose_io_thread (taskset_);
}
void zmq::object_t::send_stop ()
@@ -144,7 +144,7 @@ void zmq::object_t::send_stop ()
command_t cmd;
cmd.destination = this;
cmd.type = command_t::stop;
- dispatcher->write (thread_slot, thread_slot, cmd);
+ context->write (thread_slot, thread_slot, cmd);
}
void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_,
@@ -289,6 +289,6 @@ void zmq::object_t::send_command (command_t &cmd_)
if (destination_thread_slot == thread_slot)
cmd_.destination->process_command (cmd_);
else
- dispatcher->write (thread_slot, destination_thread_slot, cmd_);
+ context->write (thread_slot, destination_thread_slot, cmd_);
}
diff --git a/src/object.hpp b/src/object.hpp
index b2ae334..796e7fa 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -32,7 +32,7 @@ namespace zmq
{
public:
- object_t (class dispatcher_t *dispatcher_, int thread_slot_);
+ object_t (class context_t *context_, int thread_slot_);
object_t (object_t *parent_);
~object_t ();
@@ -42,7 +42,7 @@ namespace zmq
protected:
// Derived object can use following functions to interact with
- // global repositories. See dispatcher.hpp for function details.
+ // global repositories. See context.hpp for function details.
int thread_slot_count ();
void create_pipe (class object_t *reader_parent_,
class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
@@ -87,7 +87,7 @@ namespace zmq
virtual void process_terminate_ack ();
// Pointer to the root of the infrastructure.
- class dispatcher_t *dispatcher;
+ class context_t *context;
// Slot ID of the thread the object belongs to.
int thread_slot;
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 16ac837..8894a22 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -32,10 +32,10 @@ namespace zmq
class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity>
{
- // Dispatcher is a friend so that it can create & destroy the pipes.
+ // Context is a friend so that it can create & destroy the pipes.
// By making constructor & destructor private we are sure that nobody
- // except dispatcher messes with pipes.
- friend class dispatcher_t;
+ // except context messes with pipes.
+ friend class context_t;
private:
@@ -45,7 +45,7 @@ namespace zmq
void set_index (int index_);
int get_index ();
- // Index of the pipe in dispatcher's array of pipes.
+ // Index of the pipe in context's array of pipes.
int index;
pipe_t (const pipe_t&);
diff --git a/src/pipe_reader.cpp b/src/pipe_reader.cpp
index eea1371..79dfe2e 100644
--- a/src/pipe_reader.cpp
+++ b/src/pipe_reader.cpp
@@ -113,6 +113,6 @@ void zmq::pipe_reader_t::terminate ()
void zmq::pipe_reader_t::process_terminate_ack ()
{
- // Ask dispatcher to deallocate the pipe.
+ // Ask context to deallocate the pipe.
destroy_pipe (pipe);
}
diff --git a/src/pipe_reader.hpp b/src/pipe_reader.hpp
index 4f85988..cf45bb4 100644
--- a/src/pipe_reader.hpp
+++ b/src/pipe_reader.hpp
@@ -28,10 +28,10 @@ namespace zmq
class pipe_reader_t : public object_t
{
- // Dispatcher is a friend so that it can create & destroy the reader.
+ // Context is a friend so that it can create & destroy the reader.
// By making constructor & destructor private we are sure that nobody
- // except dispatcher messes with readers.
- friend class dispatcher_t;
+ // except context messes with readers.
+ friend class context_t;
public:
diff --git a/src/pipe_writer.hpp b/src/pipe_writer.hpp
index 2c5132e..a727b1f 100644
--- a/src/pipe_writer.hpp
+++ b/src/pipe_writer.hpp
@@ -28,10 +28,10 @@ namespace zmq
class pipe_writer_t : public object_t
{
- // Dispatcher is a friend so that it can create & destroy the writer.
+ // Context is a friend so that it can create & destroy the writer.
// By making constructor & destructor private we are sure that nobody
- // except dispatcher messes with writers.
- friend class dispatcher_t;
+ // except context messes with writers.
+ friend class context_t;
public:
diff --git a/src/safe_object.cpp b/src/safe_object.cpp
index 5a5ab8b..d4a92d7 100644
--- a/src/safe_object.cpp
+++ b/src/safe_object.cpp
@@ -19,9 +19,9 @@
#include "safe_object.hpp"
-zmq::safe_object_t::safe_object_t (class dispatcher_t *dispatcher_,
- int thread_slot_) :
- object_t (dispatcher_, thread_slot_),
+zmq::safe_object_t::safe_object_t (class context_t *context_,
+ int thread_slot_) :
+ object_t (context_, thread_slot_),
processed_seqnum (0),
terminating (false)
{
diff --git a/src/safe_object.hpp b/src/safe_object.hpp
index 8bdd41c..b47db48 100644
--- a/src/safe_object.hpp
+++ b/src/safe_object.hpp
@@ -36,7 +36,7 @@ namespace zmq
{
public:
- safe_object_t (class dispatcher_t *dispatcher_, int thread_slot_);
+ safe_object_t (class context_t *context_, int thread_slot_);
safe_object_t (object_t *parent_);
void inc_seqnum ();
diff --git a/src/zmq.cpp b/src/zmq.cpp
index a7fd486..0fb6fe1 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -25,7 +25,7 @@
#include "i_api.hpp"
#include "err.hpp"
-#include "dispatcher.hpp"
+#include "context.hpp"
#include "msg.hpp"
int zmq_msg_init (zmq_msg *msg_)
@@ -162,28 +162,27 @@ int zmq_msg_type (zmq_msg *msg_)
void *zmq_init (int app_threads_, int io_threads_)
{
- // There should be at least a single thread managed by the dispatcher.
+ // There should be at least a single thread managed by the context.
if (app_threads_ < 0 || io_threads_ < 0 ||
app_threads_ + io_threads_ == 0) {
errno = EINVAL;
return NULL;
}
- zmq::dispatcher_t *dispatcher =
- new zmq::dispatcher_t (app_threads_, io_threads_);
- zmq_assert (dispatcher);
- return (void*) dispatcher;
+ zmq::context_t *context = new zmq::context_t (app_threads_, io_threads_);
+ zmq_assert (context);
+ return (void*) context;
}
int zmq_term (void *context_)
{
- ((zmq::dispatcher_t*) context_)->shutdown ();
+ ((zmq::context_t*) context_)->shutdown ();
return 0;
}
void *zmq_socket (void *context_, int type_)
{
- return (void*) (((zmq::dispatcher_t*) context_)->create_socket (type_));
+ return (void*) (((zmq::context_t*) context_)->create_socket (type_));
}
int zmq_close (void *s_)