From a801b6d8b37557ccfb53030dca22f89a3f99b59c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 20 Aug 2009 11:32:23 +0200 Subject: couple of bugs in shutdown mechanism fixed --- src/object.cpp | 43 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 7 deletions(-) (limited to 'src/object.cpp') diff --git a/src/object.cpp b/src/object.cpp index e2267d6..b1b7c8a 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -22,6 +22,10 @@ #include "err.hpp" #include "io_thread.hpp" #include "simple_semaphore.hpp" +#include "owned.hpp" +#include "session.hpp" +#include "socket_base.hpp" +#include "zmq_engine.hpp" // TODO: remove this line zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : dispatcher (dispatcher_), @@ -65,6 +69,10 @@ void zmq::object_t::process_command (command_t &cmd_) process_own (cmd_.args.own.object); return; + case command_t::attach: + process_attach (cmd_.args.attach.engine); + return; + case command_t::bind: process_bind (); return; @@ -101,15 +109,18 @@ void zmq::object_t::send_stop () dispatcher->write (thread_slot, thread_slot, cmd); } -void zmq::object_t::send_plug (object_t *destination_) +void zmq::object_t::send_plug (owned_t *destination_) { + // Let the object know that it cannot shut down till it gets this command. + destination_->inc_seqnum (); + command_t cmd; cmd.destination = destination_; cmd.type = command_t::plug; send_command (cmd); } -void zmq::object_t::send_own (object_t *destination_, object_t *object_) +void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) { command_t cmd; cmd.destination = destination_; @@ -118,6 +129,18 @@ void zmq::object_t::send_own (object_t *destination_, object_t *object_) send_command (cmd); } +void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_) +{ + // Let the object know that it cannot shut down till it gets this command. + destination_->inc_seqnum (); + + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::attach; + cmd.args.attach.engine = engine_; + send_command (cmd); +} + void zmq::object_t::send_bind (object_t *destination_) { command_t cmd; @@ -126,7 +149,8 @@ void zmq::object_t::send_bind (object_t *destination_) send_command (cmd); } -void zmq::object_t::send_term_req (object_t *destination_, object_t *object_) +void zmq::object_t::send_term_req (socket_base_t *destination_, + owned_t *object_) { command_t cmd; cmd.destination = destination_; @@ -135,7 +159,7 @@ void zmq::object_t::send_term_req (object_t *destination_, object_t *object_) send_command (cmd); } -void zmq::object_t::send_term (object_t *destination_) +void zmq::object_t::send_term (owned_t *destination_) { command_t cmd; cmd.destination = destination_; @@ -143,7 +167,7 @@ void zmq::object_t::send_term (object_t *destination_) send_command (cmd); } -void zmq::object_t::send_term_ack (object_t *destination_) +void zmq::object_t::send_term_ack (socket_base_t *destination_) { command_t cmd; cmd.destination = destination_; @@ -161,7 +185,12 @@ void zmq::object_t::process_plug () zmq_assert (false); } -void zmq::object_t::process_own (object_t *object_) +void zmq::object_t::process_own (owned_t *object_) +{ + zmq_assert (false); +} + +void zmq::object_t::process_attach (zmq_engine_t *engine_) { zmq_assert (false); } @@ -171,7 +200,7 @@ void zmq::object_t::process_bind () zmq_assert (false); } -void zmq::object_t::process_term_req (object_t *object_) +void zmq::object_t::process_term_req (owned_t *object_) { zmq_assert (false); } -- cgit v1.2.3