summaryrefslogtreecommitdiff
path: root/src/owned.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/owned.cpp')
-rw-r--r--src/owned.cpp53
1 files changed, 26 insertions, 27 deletions
diff --git a/src/owned.cpp b/src/owned.cpp
index 22e257f..6995a39 100644
--- a/src/owned.cpp
+++ b/src/owned.cpp
@@ -20,11 +20,12 @@
#include "owned.hpp"
#include "err.hpp"
-zmq::owned_t::owned_t (object_t *parent_, object_t *owner_) :
+zmq::owned_t::owned_t (object_t *parent_, socket_base_t *owner_) :
object_t (parent_),
owner (owner_),
- plugged_in (false),
- terminated (false)
+ sent_seqnum (0),
+ processed_seqnum (0),
+ shutting_down (false)
{
}
@@ -32,21 +33,18 @@ zmq::owned_t::~owned_t ()
{
}
-void zmq::owned_t::process_plug ()
+void zmq::owned_t::inc_seqnum ()
{
- zmq_assert (!plugged_in);
+ // NB: This function may be called from a different thread!
+ sent_seqnum.add (1);
+}
- // If termination of the object was already requested, destroy it and
- // send the termination acknowledgement.
- if (terminated) {
- send_term_ack (owner);
- delete this;
- return;
- }
+void zmq::owned_t::process_plug ()
+{
+ // Keep track of how many commands were processed so far.
+ processed_seqnum++;
- // Notify the generic termination mechanism (io_object_t) that the object
- // is already plugged in.
- plugged_in = true;
+ finalise_command ();
}
void zmq::owned_t::term ()
@@ -56,19 +54,20 @@ void zmq::owned_t::term ()
void zmq::owned_t::process_term ()
{
- zmq_assert (!terminated);
+ zmq_assert (!shutting_down);
+ shutting_down = true;
- // If termination request has occured even before the object was plugged in
- // wait till plugging in happens, then acknowledge the termination.
- if (!plugged_in) {
- terminated = true;
- return;
- }
+ finalise_command ();
+}
- // Otherwise, destroy the object and acknowledge the termination
- // straight away.
- send_term_ack (owner);
- process_unplug ();
- delete this;
+void zmq::owned_t::finalise_command ()
+{
+ // If termination request was already received and there are no more
+ // commands to wait for, terminate the object.
+ if (shutting_down && processed_seqnum == sent_seqnum.get ()) {
+ send_term_ack (owner);
+ process_unplug ();
+ delete this;
+ }
}