From a8b410e66c3c75809c8e9c01dd3e35c579f02347 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 8 Aug 2009 16:01:58 +0200 Subject: lockfree interaction patter for 3 theads implemented --- src/object.cpp | 158 +++++++++++++++++---------------------------------------- 1 file changed, 47 insertions(+), 111 deletions(-) (limited to 'src/object.cpp') diff --git a/src/object.cpp b/src/object.cpp index 36f3937..e2267d6 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -18,19 +18,19 @@ */ #include "object.hpp" -#include "context.hpp" +#include "dispatcher.hpp" #include "err.hpp" #include "io_thread.hpp" #include "simple_semaphore.hpp" -zmq::object_t::object_t (context_t *context_, int thread_slot_) : - context (context_), +zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : + dispatcher (dispatcher_), thread_slot (thread_slot_) { } zmq::object_t::object_t (object_t *parent_) : - context (parent_->context), + dispatcher (parent_->dispatcher), thread_slot (parent_->thread_slot) { } @@ -41,7 +41,7 @@ zmq::object_t::~object_t () int zmq::object_t::thread_slot_count () { - return context->thread_slot_count (); + return dispatcher->thread_slot_count (); } int zmq::object_t::get_thread_slot () @@ -53,45 +53,32 @@ void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { - case command_t::head: - process_head (cmd_.args.head.bytes); + case command_t::stop: + process_stop (); break; - case command_t::tail: - process_tail (cmd_.args.tail.bytes); - break; + case command_t::plug: + process_plug (); + return; - case command_t::engine: - process_engine (cmd_.args.engine.engine); - break; + case command_t::own: + process_own (cmd_.args.own.object); + return; case command_t::bind: - process_bind (cmd_.args.bind.reader, cmd_.args.bind.peer); - break; - - case command_t::reg: - process_reg (cmd_.args.reg.smph); - break; - - case command_t::reg_and_bind: - process_reg_and_bind (cmd_.args.reg_and_bind.peer, - cmd_.args.reg_and_bind.flow_in, cmd_.args.reg_and_bind.flow_out); - break; - - case command_t::unreg: - process_unreg (cmd_.args.unreg.smph); - break; - - case command_t::terminate: - process_terminate (); - break; + process_bind (); + return; - case command_t::terminate_ack: - process_terminate_ack (); - break; + case command_t::term_req: + process_term_req (cmd_.args.term_req.object); + return; + + case command_t::term: + process_term (); + return; - case command_t::stop: - process_stop (); + case command_t::term_ack: + process_term_ack (); return; default: @@ -101,7 +88,7 @@ void zmq::object_t::process_command (command_t &cmd_) zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { - return context->choose_io_thread (taskset_); + return dispatcher->choose_io_thread (taskset_); } void zmq::object_t::send_stop () @@ -111,91 +98,56 @@ void zmq::object_t::send_stop () command_t cmd; cmd.destination = this; cmd.type = command_t::stop; - context->write (thread_slot, thread_slot, cmd); + dispatcher->write (thread_slot, thread_slot, cmd); } -void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, - session_t *peer_) +void zmq::object_t::send_plug (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::bind; - cmd.args.bind.reader = reader_; - cmd.args.bind.peer = peer_; + cmd.type = command_t::plug; send_command (cmd); } -void zmq::object_t::send_head (object_t *destination_, uint64_t bytes_) +void zmq::object_t::send_own (object_t *destination_, object_t *object_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::head; - cmd.args.head.bytes = bytes_; + cmd.type = command_t::own; + cmd.args.own.object = object_; send_command (cmd); } -void zmq::object_t::send_tail (object_t *destination_, uint64_t bytes_) +void zmq::object_t::send_bind (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::tail; - cmd.args.tail.bytes = bytes_; - send_command (cmd); -} - -void zmq::object_t::send_reg (object_t *destination_, simple_semaphore_t *smph_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::reg; - cmd.args.reg.smph = smph_; - send_command (cmd); -} - -void zmq::object_t::send_reg_and_bind (object_t *destination_, - session_t *peer_, bool flow_in_, bool flow_out_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::reg_and_bind; - cmd.args.reg_and_bind.peer = peer_; - cmd.args.reg_and_bind.flow_in = flow_in_; - cmd.args.reg_and_bind.flow_out = flow_out_; - send_command (cmd); -} - -void zmq::object_t::send_unreg (object_t *destination_, - simple_semaphore_t *smph_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::unreg; - cmd.args.unreg.smph = smph_; + cmd.type = command_t::bind; send_command (cmd); } -void zmq::object_t::send_engine (object_t *destination_, i_engine *engine_) +void zmq::object_t::send_term_req (object_t *destination_, object_t *object_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::engine; - cmd.args.engine.engine = engine_; + cmd.type = command_t::term_req; + cmd.args.term_req.object = object_; send_command (cmd); } -void zmq::object_t::send_terminate (object_t *destination_) +void zmq::object_t::send_term (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::terminate; + cmd.type = command_t::term; send_command (cmd); } -void zmq::object_t::send_terminate_ack (object_t *destination_) +void zmq::object_t::send_term_ack (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::terminate_ack; + cmd.type = command_t::term_ack; send_command (cmd); } @@ -204,48 +156,32 @@ void zmq::object_t::process_stop () zmq_assert (false); } -void zmq::object_t::process_bind (pipe_reader_t *reader_, session_t *peer_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_head (uint64_t bytes_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_tail (uint64_t bytes_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_reg (simple_semaphore_t *smph_) +void zmq::object_t::process_plug () { zmq_assert (false); } -void zmq::object_t::process_reg_and_bind (session_t *session_, - bool flow_in_, bool flow_out_) +void zmq::object_t::process_own (object_t *object_) { zmq_assert (false); } -void zmq::object_t::process_unreg (simple_semaphore_t *smph_) +void zmq::object_t::process_bind () { zmq_assert (false); } -void zmq::object_t::process_engine (i_engine *engine_) +void zmq::object_t::process_term_req (object_t *object_) { zmq_assert (false); } -void zmq::object_t::process_terminate () +void zmq::object_t::process_term () { zmq_assert (false); } -void zmq::object_t::process_terminate_ack () +void zmq::object_t::process_term_ack () { zmq_assert (false); } @@ -256,6 +192,6 @@ void zmq::object_t::send_command (command_t &cmd_) if (destination_thread_slot == thread_slot) cmd_.destination->process_command (cmd_); else - context->write (thread_slot, destination_thread_slot, cmd_); + dispatcher->write (thread_slot, destination_thread_slot, cmd_); } -- cgit v1.2.3