summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/dispatcher.cpp2
-rw-r--r--src/object.cpp27
-rw-r--r--src/object.hpp12
-rw-r--r--src/owned.cpp26
-rw-r--r--src/owned.hpp14
-rw-r--r--src/session.cpp5
-rw-r--r--src/socket_base.cpp12
-rw-r--r--src/socket_base.hpp1
-rw-r--r--src/zmq_connecter.cpp1
-rw-r--r--src/zmq_connecter_init.cpp4
-rw-r--r--src/zmq_listener.cpp2
-rw-r--r--src/zmq_listener_init.cpp6
12 files changed, 54 insertions, 58 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 1e41ee8..51143b3 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -252,6 +252,8 @@ zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_)
// Increment the command sequence number of the peer so that it won't
// get deallocated until "bind" command is issued by the caller.
+ // The subsequent 'bind' has to be called with inc_seqnum parameter
+ // set to false, so that the seqnum isn't incremented twice.
endpoint->inc_seqnum ();
endpoints_sync.unlock ();
diff --git a/src/object.cpp b/src/object.cpp
index fda7b03..66079db 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -72,6 +72,7 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::plug:
process_plug ();
+ process_seqnum ();
return;
case command_t::own:
@@ -80,10 +81,12 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::attach:
process_attach (cmd_.args.attach.engine);
+ process_seqnum ();
return;
case command_t::bind:
process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_seqnum ();
return;
case command_t::pipe_term:
@@ -151,10 +154,10 @@ void zmq::object_t::send_stop ()
dispatcher->write (thread_slot, thread_slot, cmd);
}
-void zmq::object_t::send_plug (owned_t *destination_)
+void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
{
- // Let the object know that it cannot shut down till it gets this command.
- destination_->inc_seqnum ();
+ if (inc_seqnum_)
+ destination_->inc_seqnum ();
command_t cmd;
cmd.destination = destination_;
@@ -171,10 +174,12 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
send_command (cmd);
}
-void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
+void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
+ bool inc_seqnum_)
{
- // The assumption here is that command sequence number of the destination
- // object was already incremented in find_session function.
+ if (inc_seqnum_)
+ destination_->inc_seqnum ();
+
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::attach;
@@ -183,8 +188,11 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
}
void zmq::object_t::send_bind (socket_base_t *destination_,
- reader_t *in_pipe_, writer_t *out_pipe_)
+ reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_)
{
+ if (inc_seqnum_)
+ destination_->inc_seqnum ();
+
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::bind;
@@ -298,6 +306,11 @@ void zmq::object_t::process_term_ack ()
zmq_assert (false);
}
+void zmq::object_t::process_seqnum ()
+{
+ zmq_assert (false);
+}
+
void zmq::object_t::send_command (command_t &cmd_)
{
int destination_thread_slot = cmd_.destination->get_thread_slot ();
diff --git a/src/object.hpp b/src/object.hpp
index 9d4cd9a..84a57cd 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -63,13 +63,14 @@ namespace zmq
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
- void send_plug (class owned_t *destination_);
+ void send_plug (class owned_t *destination_, bool inc_seqnum_ = true);
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
- struct i_engine *engine_);
+ struct i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_,
- class reader_t *in_pipe_, class writer_t *out_pipe_);
+ class reader_t *in_pipe_, class writer_t *out_pipe_,
+ bool inc_seqnum_ = true);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
@@ -93,6 +94,11 @@ namespace zmq
virtual void process_term ();
virtual void process_term_ack ();
+ // Special handler called after a command that requires a seqnum
+ // was processed. The implementation should catch up with its counter
+ // of processed commands here.
+ virtual void process_seqnum ();
+
// Pointer to the root of the infrastructure.
class dispatcher_t *dispatcher;
diff --git a/src/owned.cpp b/src/owned.cpp
index a534dd3..1cb331c 100644
--- a/src/owned.cpp
+++ b/src/owned.cpp
@@ -39,22 +39,6 @@ void zmq::owned_t::inc_seqnum ()
sent_seqnum.add (1);
}
-void zmq::owned_t::process_plug ()
-{
- // Keep track of how many commands were processed so far.
- processed_seqnum++;
-
- finalise_command ();
-}
-
-void zmq::owned_t::process_attach (struct i_engine *engine_)
-{
- // Keep track of how many commands were processed so far.
- processed_seqnum++;
-
- finalise_command ();
-}
-
void zmq::owned_t::term ()
{
send_term_req (owner, this);
@@ -64,11 +48,17 @@ void zmq::owned_t::process_term ()
{
zmq_assert (!shutting_down);
shutting_down = true;
+ finalise ();
+}
- finalise_command ();
+void zmq::owned_t::process_seqnum ()
+{
+ // Catch up with counter of processed commands.
+ processed_seqnum++;
+ finalise ();
}
-void zmq::owned_t::finalise_command ()
+void zmq::owned_t::finalise ()
{
// If termination request was already received and there are no more
// commands to wait for, terminate the object.
diff --git a/src/owned.hpp b/src/owned.hpp
index c56ea49..bfa8185 100644
--- a/src/owned.hpp
+++ b/src/owned.hpp
@@ -54,15 +54,6 @@ namespace zmq
// of the owned object correctly.
virtual ~owned_t ();
- // Handlers for incoming commands. It's vital that every I/O object
- // invokes io_object_t::process_plug at the end of it's own plug
- // handler.
- void process_plug ();
-
- // It's vital that session invokes io_object_t::process_attach
- // at the end of it's own attach handler.
- void process_attach (struct i_engine *engine_);
-
// io_object_t defines a new handler used to disconnect the object
// from the poller object. Implement the handlen in the derived
// classes to ensure sane cleanup.
@@ -76,10 +67,9 @@ namespace zmq
// Handlers for incoming commands.
void process_term ();
+ void process_seqnum ();
- // Generic command handler (to be called from all command handlers
- // once the processing is done).
- void finalise_command ();
+ void finalise ();
// Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum;
diff --git a/src/session.cpp b/src/session.cpp
index 5290d6b..912cfa8 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -151,12 +151,9 @@ void zmq::session_t::process_plug ()
out_pipe->set_endpoint (this);
}
- owner->inc_seqnum ();
send_bind (owner, outbound ? &outbound->reader : NULL,
inbound ? &inbound->writer : NULL);
}
-
- owned_t::process_plug ();
}
void zmq::session_t::process_unplug ()
@@ -190,6 +187,4 @@ void zmq::session_t::process_attach (i_engine *engine_)
zmq_assert (engine_);
engine = engine_;
engine->plug (this);
-
- owned_t::process_attach (engine_);
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 6a972e4..888b6ea 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -161,7 +161,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// was incremented in find_endpoint function. The callee is notified
// about the fact via the last parameter.
send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL);
+ in_pipe ? &in_pipe->writer : NULL, false);
return 0;
}
@@ -247,8 +247,6 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- // Reserve a sequence number for following 'attach' command.
- session->inc_seqnum ();
send_attach (session, pgm_sender);
}
else if (options.requires_in) {
@@ -264,8 +262,6 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- // Reserve a sequence number for following 'attach' command.
- session->inc_seqnum ();
send_attach (session, pgm_receiver);
}
else
@@ -511,7 +507,6 @@ void zmq::socket_base_t::process_own (owned_t *object_)
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
{
- processed_seqnum++;
attach_pipes (in_pipe_, out_pipe_);
}
@@ -542,3 +537,8 @@ void zmq::socket_base_t::process_term_ack ()
pending_term_acks--;
}
+void zmq::socket_base_t::process_seqnum ()
+{
+ processed_seqnum++;
+}
+
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index c766bda..79a8340 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -117,6 +117,7 @@ namespace zmq
void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_);
void process_term_req (class owned_t *object_);
void process_term_ack ();
+ void process_seqnum ();
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 28056b2..657622a 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -49,7 +49,6 @@ void zmq::zmq_connecter_t::process_plug ()
add_timer ();
else
start_connecting ();
- owned_t::process_plug ();
}
void zmq::zmq_connecter_t::process_unplug ()
diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp
index f0cbf90..3f165cd 100644
--- a/src/zmq_connecter_init.cpp
+++ b/src/zmq_connecter_init.cpp
@@ -67,7 +67,8 @@ bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_)
zmq_assert (false);
}
- send_attach (session, engine);
+ // No need to increment seqnum as it was alredy incremented above.
+ send_attach (session, engine, false);
engine = NULL;
// Destroy the init object.
@@ -113,7 +114,6 @@ void zmq::zmq_connecter_init_t::process_plug ()
{
zmq_assert (engine);
engine->plug (this);
- owned_t::process_plug ();
}
void zmq::zmq_connecter_init_t::process_unplug ()
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 35e4428..5d678a2 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -44,8 +44,6 @@ void zmq::zmq_listener_t::process_plug ()
// Start polling for incoming connections.
handle = add_fd (tcp_listener.get_fd ());
set_pollin (handle);
-
- owned_t::process_plug ();
}
void zmq::zmq_listener_t::process_unplug ()
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index 0d9488d..632bebe 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -83,7 +83,10 @@ void zmq::zmq_listener_init_t::flush ()
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
}
- send_attach (session, engine);
+
+ // No need to increment seqnum as it was laready incremented above.
+ send_attach (session, engine, false);
+
engine = NULL;
// Destroy the init object.
@@ -103,7 +106,6 @@ void zmq::zmq_listener_init_t::process_plug ()
{
zmq_assert (engine);
engine->plug (this);
- owned_t::process_plug ();
}
void zmq::zmq_listener_init_t::process_unplug ()