summaryrefslogtreecommitdiff
path: root/src/object.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-08 16:01:58 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-08 16:01:58 +0200
commita8b410e66c3c75809c8e9c01dd3e35c579f02347 (patch)
tree7af63906dce0216f86e5ff0767efaddfd6492cfd /src/object.cpp
parent0b5cc026fbe7ccc6de66907be29471562a2d344d (diff)
lockfree interaction patter for 3 theads implemented
Diffstat (limited to 'src/object.cpp')
-rw-r--r--src/object.cpp158
1 files changed, 47 insertions, 111 deletions
diff --git a/src/object.cpp b/src/object.cpp
index 36f3937..e2267d6 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -18,19 +18,19 @@
*/
#include "object.hpp"
-#include "context.hpp"
+#include "dispatcher.hpp"
#include "err.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
-zmq::object_t::object_t (context_t *context_, int thread_slot_) :
- context (context_),
+zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
+ dispatcher (dispatcher_),
thread_slot (thread_slot_)
{
}
zmq::object_t::object_t (object_t *parent_) :
- context (parent_->context),
+ dispatcher (parent_->dispatcher),
thread_slot (parent_->thread_slot)
{
}
@@ -41,7 +41,7 @@ zmq::object_t::~object_t ()
int zmq::object_t::thread_slot_count ()
{
- return context->thread_slot_count ();
+ return dispatcher->thread_slot_count ();
}
int zmq::object_t::get_thread_slot ()
@@ -53,45 +53,32 @@ void zmq::object_t::process_command (command_t &cmd_)
{
switch (cmd_.type) {
- case command_t::head:
- process_head (cmd_.args.head.bytes);
+ case command_t::stop:
+ process_stop ();
break;
- case command_t::tail:
- process_tail (cmd_.args.tail.bytes);
- break;
+ case command_t::plug:
+ process_plug ();
+ return;
- case command_t::engine:
- process_engine (cmd_.args.engine.engine);
- break;
+ case command_t::own:
+ process_own (cmd_.args.own.object);
+ return;
case command_t::bind:
- process_bind (cmd_.args.bind.reader, cmd_.args.bind.peer);
- break;
-
- case command_t::reg:
- process_reg (cmd_.args.reg.smph);
- break;
-
- case command_t::reg_and_bind:
- process_reg_and_bind (cmd_.args.reg_and_bind.peer,
- cmd_.args.reg_and_bind.flow_in, cmd_.args.reg_and_bind.flow_out);
- break;
-
- case command_t::unreg:
- process_unreg (cmd_.args.unreg.smph);
- break;
-
- case command_t::terminate:
- process_terminate ();
- break;
+ process_bind ();
+ return;
- case command_t::terminate_ack:
- process_terminate_ack ();
- break;
+ case command_t::term_req:
+ process_term_req (cmd_.args.term_req.object);
+ return;
+
+ case command_t::term:
+ process_term ();
+ return;
- case command_t::stop:
- process_stop ();
+ case command_t::term_ack:
+ process_term_ack ();
return;
default:
@@ -101,7 +88,7 @@ void zmq::object_t::process_command (command_t &cmd_)
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
- return context->choose_io_thread (taskset_);
+ return dispatcher->choose_io_thread (taskset_);
}
void zmq::object_t::send_stop ()
@@ -111,91 +98,56 @@ void zmq::object_t::send_stop ()
command_t cmd;
cmd.destination = this;
cmd.type = command_t::stop;
- context->write (thread_slot, thread_slot, cmd);
+ dispatcher->write (thread_slot, thread_slot, cmd);
}
-void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_,
- session_t *peer_)
+void zmq::object_t::send_plug (object_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::bind;
- cmd.args.bind.reader = reader_;
- cmd.args.bind.peer = peer_;
+ cmd.type = command_t::plug;
send_command (cmd);
}
-void zmq::object_t::send_head (object_t *destination_, uint64_t bytes_)
+void zmq::object_t::send_own (object_t *destination_, object_t *object_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::head;
- cmd.args.head.bytes = bytes_;
+ cmd.type = command_t::own;
+ cmd.args.own.object = object_;
send_command (cmd);
}
-void zmq::object_t::send_tail (object_t *destination_, uint64_t bytes_)
+void zmq::object_t::send_bind (object_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::tail;
- cmd.args.tail.bytes = bytes_;
- send_command (cmd);
-}
-
-void zmq::object_t::send_reg (object_t *destination_, simple_semaphore_t *smph_)
-{
- command_t cmd;
- cmd.destination = destination_;
- cmd.type = command_t::reg;
- cmd.args.reg.smph = smph_;
- send_command (cmd);
-}
-
-void zmq::object_t::send_reg_and_bind (object_t *destination_,
- session_t *peer_, bool flow_in_, bool flow_out_)
-{
- command_t cmd;
- cmd.destination = destination_;
- cmd.type = command_t::reg_and_bind;
- cmd.args.reg_and_bind.peer = peer_;
- cmd.args.reg_and_bind.flow_in = flow_in_;
- cmd.args.reg_and_bind.flow_out = flow_out_;
- send_command (cmd);
-}
-
-void zmq::object_t::send_unreg (object_t *destination_,
- simple_semaphore_t *smph_)
-{
- command_t cmd;
- cmd.destination = destination_;
- cmd.type = command_t::unreg;
- cmd.args.unreg.smph = smph_;
+ cmd.type = command_t::bind;
send_command (cmd);
}
-void zmq::object_t::send_engine (object_t *destination_, i_engine *engine_)
+void zmq::object_t::send_term_req (object_t *destination_, object_t *object_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::engine;
- cmd.args.engine.engine = engine_;
+ cmd.type = command_t::term_req;
+ cmd.args.term_req.object = object_;
send_command (cmd);
}
-void zmq::object_t::send_terminate (object_t *destination_)
+void zmq::object_t::send_term (object_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::terminate;
+ cmd.type = command_t::term;
send_command (cmd);
}
-void zmq::object_t::send_terminate_ack (object_t *destination_)
+void zmq::object_t::send_term_ack (object_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
- cmd.type = command_t::terminate_ack;
+ cmd.type = command_t::term_ack;
send_command (cmd);
}
@@ -204,48 +156,32 @@ void zmq::object_t::process_stop ()
zmq_assert (false);
}
-void zmq::object_t::process_bind (pipe_reader_t *reader_, session_t *peer_)
-{
- zmq_assert (false);
-}
-
-void zmq::object_t::process_head (uint64_t bytes_)
-{
- zmq_assert (false);
-}
-
-void zmq::object_t::process_tail (uint64_t bytes_)
-{
- zmq_assert (false);
-}
-
-void zmq::object_t::process_reg (simple_semaphore_t *smph_)
+void zmq::object_t::process_plug ()
{
zmq_assert (false);
}
-void zmq::object_t::process_reg_and_bind (session_t *session_,
- bool flow_in_, bool flow_out_)
+void zmq::object_t::process_own (object_t *object_)
{
zmq_assert (false);
}
-void zmq::object_t::process_unreg (simple_semaphore_t *smph_)
+void zmq::object_t::process_bind ()
{
zmq_assert (false);
}
-void zmq::object_t::process_engine (i_engine *engine_)
+void zmq::object_t::process_term_req (object_t *object_)
{
zmq_assert (false);
}
-void zmq::object_t::process_terminate ()
+void zmq::object_t::process_term ()
{
zmq_assert (false);
}
-void zmq::object_t::process_terminate_ack ()
+void zmq::object_t::process_term_ack ()
{
zmq_assert (false);
}
@@ -256,6 +192,6 @@ void zmq::object_t::send_command (command_t &cmd_)
if (destination_thread_slot == thread_slot)
cmd_.destination->process_command (cmd_);
else
- context->write (thread_slot, destination_thread_slot, cmd_);
+ dispatcher->write (thread_slot, destination_thread_slot, cmd_);
}