summaryrefslogtreecommitdiff
path: root/src/object.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-02 21:26:47 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-02 21:26:47 +0100
commit7146ef85e96551ce6f7b80d014463f246d09c878 (patch)
tree103ccf90868acc3982987643817f385fbde76681 /src/object.cpp
parentcb84580bbced0b5d34ddcbac6e0aed5d0ad7cae6 (diff)
seqnum mechanism automated
Diffstat (limited to 'src/object.cpp')
-rw-r--r--src/object.cpp27
1 files changed, 20 insertions, 7 deletions
diff --git a/src/object.cpp b/src/object.cpp
index fda7b03..66079db 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -72,6 +72,7 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::plug:
process_plug ();
+ process_seqnum ();
return;
case command_t::own:
@@ -80,10 +81,12 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::attach:
process_attach (cmd_.args.attach.engine);
+ process_seqnum ();
return;
case command_t::bind:
process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_seqnum ();
return;
case command_t::pipe_term:
@@ -151,10 +154,10 @@ void zmq::object_t::send_stop ()
dispatcher->write (thread_slot, thread_slot, cmd);
}
-void zmq::object_t::send_plug (owned_t *destination_)
+void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
{
- // Let the object know that it cannot shut down till it gets this command.
- destination_->inc_seqnum ();
+ if (inc_seqnum_)
+ destination_->inc_seqnum ();
command_t cmd;
cmd.destination = destination_;
@@ -171,10 +174,12 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
send_command (cmd);
}
-void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
+void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
+ bool inc_seqnum_)
{
- // The assumption here is that command sequence number of the destination
- // object was already incremented in find_session function.
+ if (inc_seqnum_)
+ destination_->inc_seqnum ();
+
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::attach;
@@ -183,8 +188,11 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
}
void zmq::object_t::send_bind (socket_base_t *destination_,
- reader_t *in_pipe_, writer_t *out_pipe_)
+ reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_)
{
+ if (inc_seqnum_)
+ destination_->inc_seqnum ();
+
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::bind;
@@ -298,6 +306,11 @@ void zmq::object_t::process_term_ack ()
zmq_assert (false);
}
+void zmq::object_t::process_seqnum ()
+{
+ zmq_assert (false);
+}
+
void zmq::object_t::send_command (command_t &cmd_)
{
int destination_thread_slot = cmd_.destination->get_thread_slot ();