summaryrefslogtreecommitdiff
path: root/src/io_object.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-09 11:21:47 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-09 11:21:47 +0200
commitbde396f1561fb5e57e6e413a40d904586e186d42 (patch)
treecbd7537c95bbc8ab8a09a17cec6533a268500264 /src/io_object.cpp
parent5b5b513330e96e3e08d0c2c60d03044091976420 (diff)
fix to 3-thread synchronisation algorithm
Diffstat (limited to 'src/io_object.cpp')
-rw-r--r--src/io_object.cpp48
1 files changed, 47 insertions, 1 deletions
diff --git a/src/io_object.cpp b/src/io_object.cpp
index a4badd7..b7c70d4 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -19,10 +19,13 @@
#include "io_object.hpp"
#include "io_thread.hpp"
+#include "err.hpp"
zmq::io_object_t::io_object_t (io_thread_t *parent_, object_t *owner_) :
object_t (parent_),
- owner (owner_)
+ owner (owner_),
+ plugged_in (false),
+ terminated (false)
{
// Retrieve the poller from the thread we are running in.
poller = parent_->get_poller ();
@@ -32,6 +35,23 @@ zmq::io_object_t::~io_object_t ()
{
}
+void zmq::io_object_t::process_plug ()
+{
+ zmq_assert (!plugged_in);
+
+ // If termination of the object was already requested, destroy it and
+ // send the termination acknowledgement.
+ if (terminated) {
+ send_term_ack (owner);
+ delete this;
+ return;
+ }
+
+ // Notify the generic termination mechanism (io_object_t) that the object
+ // is already plugged in.
+ plugged_in = true;
+}
+
zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_, i_poll_events *events_)
{
return poller->add_fd (fd_, events_);
@@ -72,6 +92,21 @@ void zmq::io_object_t::cancel_timer (i_poll_events *events_)
poller->cancel_timer (events_);
}
+void zmq::io_object_t::in_event ()
+{
+ zmq_assert (false);
+}
+
+void zmq::io_object_t::out_event ()
+{
+ zmq_assert (false);
+}
+
+void zmq::io_object_t::timer_event ()
+{
+ zmq_assert (false);
+}
+
void zmq::io_object_t::term ()
{
send_term_req (owner, this);
@@ -79,6 +114,17 @@ void zmq::io_object_t::term ()
void zmq::io_object_t::process_term ()
{
+ zmq_assert (!terminated);
+
+ // If termination request has occured even before the object was plugged in
+ // wait till plugging in happens, then acknowledge the termination.
+ if (!plugged_in) {
+ terminated = true;
+ return;
+ }
+
+ // Otherwise, destroy the object and acknowledge the termination
+ // straight away.
send_term_ack (owner);
delete this;
}