From 7146ef85e96551ce6f7b80d014463f246d09c878 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 2 Dec 2009 21:26:47 +0100 Subject: seqnum mechanism automated --- src/object.cpp | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) (limited to 'src/object.cpp') diff --git a/src/object.cpp b/src/object.cpp index fda7b03..66079db 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -72,6 +72,7 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::plug: process_plug (); + process_seqnum (); return; case command_t::own: @@ -80,10 +81,12 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::attach: process_attach (cmd_.args.attach.engine); + process_seqnum (); return; case command_t::bind: process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_seqnum (); return; case command_t::pipe_term: @@ -151,10 +154,10 @@ void zmq::object_t::send_stop () dispatcher->write (thread_slot, thread_slot, cmd); } -void zmq::object_t::send_plug (owned_t *destination_) +void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) { - // Let the object know that it cannot shut down till it gets this command. - destination_->inc_seqnum (); + if (inc_seqnum_) + destination_->inc_seqnum (); command_t cmd; cmd.destination = destination_; @@ -171,10 +174,12 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) send_command (cmd); } -void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) +void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, + bool inc_seqnum_) { - // The assumption here is that command sequence number of the destination - // object was already incremented in find_session function. + if (inc_seqnum_) + destination_->inc_seqnum (); + command_t cmd; cmd.destination = destination_; cmd.type = command_t::attach; @@ -183,8 +188,11 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) } void zmq::object_t::send_bind (socket_base_t *destination_, - reader_t *in_pipe_, writer_t *out_pipe_) + reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_) { + if (inc_seqnum_) + destination_->inc_seqnum (); + command_t cmd; cmd.destination = destination_; cmd.type = command_t::bind; @@ -298,6 +306,11 @@ void zmq::object_t::process_term_ack () zmq_assert (false); } +void zmq::object_t::process_seqnum () +{ + zmq_assert (false); +} + void zmq::object_t::send_command (command_t &cmd_) { int destination_thread_slot = cmd_.destination->get_thread_slot (); -- cgit v1.2.3