From 7146ef85e96551ce6f7b80d014463f246d09c878 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 2 Dec 2009 21:26:47 +0100 Subject: seqnum mechanism automated --- src/dispatcher.cpp | 2 ++ src/object.cpp | 27 ++++++++++++++++++++------- src/object.hpp | 12 +++++++++--- src/owned.cpp | 26 ++++++++------------------ src/owned.hpp | 14 ++------------ src/session.cpp | 5 ----- src/socket_base.cpp | 12 ++++++------ src/socket_base.hpp | 1 + src/zmq_connecter.cpp | 1 - src/zmq_connecter_init.cpp | 4 ++-- src/zmq_listener.cpp | 2 -- src/zmq_listener_init.cpp | 6 ++++-- 12 files changed, 54 insertions(+), 58 deletions(-) (limited to 'src') diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 1e41ee8..51143b3 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -252,6 +252,8 @@ zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_) // Increment the command sequence number of the peer so that it won't // get deallocated until "bind" command is issued by the caller. + // The subsequent 'bind' has to be called with inc_seqnum parameter + // set to false, so that the seqnum isn't incremented twice. endpoint->inc_seqnum (); endpoints_sync.unlock (); diff --git a/src/object.cpp b/src/object.cpp index fda7b03..66079db 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -72,6 +72,7 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::plug: process_plug (); + process_seqnum (); return; case command_t::own: @@ -80,10 +81,12 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::attach: process_attach (cmd_.args.attach.engine); + process_seqnum (); return; case command_t::bind: process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_seqnum (); return; case command_t::pipe_term: @@ -151,10 +154,10 @@ void zmq::object_t::send_stop () dispatcher->write (thread_slot, thread_slot, cmd); } -void zmq::object_t::send_plug (owned_t *destination_) +void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) { - // Let the object know that it cannot shut down till it gets this command. - destination_->inc_seqnum (); + if (inc_seqnum_) + destination_->inc_seqnum (); command_t cmd; cmd.destination = destination_; @@ -171,10 +174,12 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) send_command (cmd); } -void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) +void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, + bool inc_seqnum_) { - // The assumption here is that command sequence number of the destination - // object was already incremented in find_session function. + if (inc_seqnum_) + destination_->inc_seqnum (); + command_t cmd; cmd.destination = destination_; cmd.type = command_t::attach; @@ -183,8 +188,11 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) } void zmq::object_t::send_bind (socket_base_t *destination_, - reader_t *in_pipe_, writer_t *out_pipe_) + reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_) { + if (inc_seqnum_) + destination_->inc_seqnum (); + command_t cmd; cmd.destination = destination_; cmd.type = command_t::bind; @@ -298,6 +306,11 @@ void zmq::object_t::process_term_ack () zmq_assert (false); } +void zmq::object_t::process_seqnum () +{ + zmq_assert (false); +} + void zmq::object_t::send_command (command_t &cmd_) { int destination_thread_slot = cmd_.destination->get_thread_slot (); diff --git a/src/object.hpp b/src/object.hpp index 9d4cd9a..84a57cd 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -63,13 +63,14 @@ namespace zmq // Derived object can use these functions to send commands // to other objects. void send_stop (); - void send_plug (class owned_t *destination_); + void send_plug (class owned_t *destination_, bool inc_seqnum_ = true); void send_own (class socket_base_t *destination_, class owned_t *object_); void send_attach (class session_t *destination_, - struct i_engine *engine_); + struct i_engine *engine_, bool inc_seqnum_ = true); void send_bind (class socket_base_t *destination_, - class reader_t *in_pipe_, class writer_t *out_pipe_); + class reader_t *in_pipe_, class writer_t *out_pipe_, + bool inc_seqnum_ = true); void send_revive (class object_t *destination_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); @@ -93,6 +94,11 @@ namespace zmq virtual void process_term (); virtual void process_term_ack (); + // Special handler called after a command that requires a seqnum + // was processed. The implementation should catch up with its counter + // of processed commands here. + virtual void process_seqnum (); + // Pointer to the root of the infrastructure. class dispatcher_t *dispatcher; diff --git a/src/owned.cpp b/src/owned.cpp index a534dd3..1cb331c 100644 --- a/src/owned.cpp +++ b/src/owned.cpp @@ -39,22 +39,6 @@ void zmq::owned_t::inc_seqnum () sent_seqnum.add (1); } -void zmq::owned_t::process_plug () -{ - // Keep track of how many commands were processed so far. - processed_seqnum++; - - finalise_command (); -} - -void zmq::owned_t::process_attach (struct i_engine *engine_) -{ - // Keep track of how many commands were processed so far. - processed_seqnum++; - - finalise_command (); -} - void zmq::owned_t::term () { send_term_req (owner, this); @@ -64,11 +48,17 @@ void zmq::owned_t::process_term () { zmq_assert (!shutting_down); shutting_down = true; + finalise (); +} - finalise_command (); +void zmq::owned_t::process_seqnum () +{ + // Catch up with counter of processed commands. + processed_seqnum++; + finalise (); } -void zmq::owned_t::finalise_command () +void zmq::owned_t::finalise () { // If termination request was already received and there are no more // commands to wait for, terminate the object. diff --git a/src/owned.hpp b/src/owned.hpp index c56ea49..bfa8185 100644 --- a/src/owned.hpp +++ b/src/owned.hpp @@ -54,15 +54,6 @@ namespace zmq // of the owned object correctly. virtual ~owned_t (); - // Handlers for incoming commands. It's vital that every I/O object - // invokes io_object_t::process_plug at the end of it's own plug - // handler. - void process_plug (); - - // It's vital that session invokes io_object_t::process_attach - // at the end of it's own attach handler. - void process_attach (struct i_engine *engine_); - // io_object_t defines a new handler used to disconnect the object // from the poller object. Implement the handlen in the derived // classes to ensure sane cleanup. @@ -76,10 +67,9 @@ namespace zmq // Handlers for incoming commands. void process_term (); + void process_seqnum (); - // Generic command handler (to be called from all command handlers - // once the processing is done). - void finalise_command (); + void finalise (); // Sequence number of the last command sent to this object. atomic_counter_t sent_seqnum; diff --git a/src/session.cpp b/src/session.cpp index 5290d6b..912cfa8 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -151,12 +151,9 @@ void zmq::session_t::process_plug () out_pipe->set_endpoint (this); } - owner->inc_seqnum (); send_bind (owner, outbound ? &outbound->reader : NULL, inbound ? &inbound->writer : NULL); } - - owned_t::process_plug (); } void zmq::session_t::process_unplug () @@ -190,6 +187,4 @@ void zmq::session_t::process_attach (i_engine *engine_) zmq_assert (engine_); engine = engine_; engine->plug (this); - - owned_t::process_attach (engine_); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6a972e4..888b6ea 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -161,7 +161,7 @@ int zmq::socket_base_t::connect (const char *addr_) // was incremented in find_endpoint function. The callee is notified // about the fact via the last parameter. send_bind (peer, out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL); + in_pipe ? &in_pipe->writer : NULL, false); return 0; } @@ -247,8 +247,6 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - // Reserve a sequence number for following 'attach' command. - session->inc_seqnum (); send_attach (session, pgm_sender); } else if (options.requires_in) { @@ -264,8 +262,6 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - // Reserve a sequence number for following 'attach' command. - session->inc_seqnum (); send_attach (session, pgm_receiver); } else @@ -511,7 +507,6 @@ void zmq::socket_base_t::process_own (owned_t *object_) void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) { - processed_seqnum++; attach_pipes (in_pipe_, out_pipe_); } @@ -542,3 +537,8 @@ void zmq::socket_base_t::process_term_ack () pending_term_acks--; } +void zmq::socket_base_t::process_seqnum () +{ + processed_seqnum++; +} + diff --git a/src/socket_base.hpp b/src/socket_base.hpp index c766bda..79a8340 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -117,6 +117,7 @@ namespace zmq void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); void process_term_req (class owned_t *object_); void process_term_ack (); + void process_seqnum (); // List of all I/O objects owned by this socket. The socket is // responsible for deallocating them before it quits. diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 28056b2..657622a 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -49,7 +49,6 @@ void zmq::zmq_connecter_t::process_plug () add_timer (); else start_connecting (); - owned_t::process_plug (); } void zmq::zmq_connecter_t::process_unplug () diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp index f0cbf90..3f165cd 100644 --- a/src/zmq_connecter_init.cpp +++ b/src/zmq_connecter_init.cpp @@ -67,7 +67,8 @@ bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_) zmq_assert (false); } - send_attach (session, engine); + // No need to increment seqnum as it was alredy incremented above. + send_attach (session, engine, false); engine = NULL; // Destroy the init object. @@ -113,7 +114,6 @@ void zmq::zmq_connecter_init_t::process_plug () { zmq_assert (engine); engine->plug (this); - owned_t::process_plug (); } void zmq::zmq_connecter_init_t::process_unplug () diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 35e4428..5d678a2 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -44,8 +44,6 @@ void zmq::zmq_listener_t::process_plug () // Start polling for incoming connections. handle = add_fd (tcp_listener.get_fd ()); set_pollin (handle); - - owned_t::process_plug (); } void zmq::zmq_listener_t::process_unplug () diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 0d9488d..632bebe 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -83,7 +83,10 @@ void zmq::zmq_listener_init_t::flush () // Reserve a sequence number for following 'attach' command. session->inc_seqnum (); } - send_attach (session, engine); + + // No need to increment seqnum as it was laready incremented above. + send_attach (session, engine, false); + engine = NULL; // Destroy the init object. @@ -103,7 +106,6 @@ void zmq::zmq_listener_init_t::process_plug () { zmq_assert (engine); engine->plug (this); - owned_t::process_plug (); } void zmq::zmq_listener_init_t::process_unplug () -- cgit v1.2.3