summaryrefslogtreecommitdiff
path: root/src/object.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/object.cpp')
-rw-r--r--src/object.cpp43
1 files changed, 36 insertions, 7 deletions
diff --git a/src/object.cpp b/src/object.cpp
index e2267d6..b1b7c8a 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -22,6 +22,10 @@
#include "err.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
+#include "owned.hpp"
+#include "session.hpp"
+#include "socket_base.hpp"
+#include "zmq_engine.hpp" // TODO: remove this line
zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
dispatcher (dispatcher_),
@@ -65,6 +69,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_own (cmd_.args.own.object);
return;
+ case command_t::attach:
+ process_attach (cmd_.args.attach.engine);
+ return;
+
case command_t::bind:
process_bind ();
return;
@@ -101,15 +109,18 @@ void zmq::object_t::send_stop ()
dispatcher->write (thread_slot, thread_slot, cmd);
}
-void zmq::object_t::send_plug (object_t *destination_)
+void zmq::object_t::send_plug (owned_t *destination_)
{
+ // Let the object know that it cannot shut down till it gets this command.
+ destination_->inc_seqnum ();
+
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::plug;
send_command (cmd);
}
-void zmq::object_t::send_own (object_t *destination_, object_t *object_)
+void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
{
command_t cmd;
cmd.destination = destination_;
@@ -118,6 +129,18 @@ void zmq::object_t::send_own (object_t *destination_, object_t *object_)
send_command (cmd);
}
+void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_)
+{
+ // Let the object know that it cannot shut down till it gets this command.
+ destination_->inc_seqnum ();
+
+ command_t cmd;
+ cmd.destination = destination_;
+ cmd.type = command_t::attach;
+ cmd.args.attach.engine = engine_;
+ send_command (cmd);
+}
+
void zmq::object_t::send_bind (object_t *destination_)
{
command_t cmd;
@@ -126,7 +149,8 @@ void zmq::object_t::send_bind (object_t *destination_)
send_command (cmd);
}
-void zmq::object_t::send_term_req (object_t *destination_, object_t *object_)
+void zmq::object_t::send_term_req (socket_base_t *destination_,
+ owned_t *object_)
{
command_t cmd;
cmd.destination = destination_;
@@ -135,7 +159,7 @@ void zmq::object_t::send_term_req (object_t *destination_, object_t *object_)
send_command (cmd);
}
-void zmq::object_t::send_term (object_t *destination_)
+void zmq::object_t::send_term (owned_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
@@ -143,7 +167,7 @@ void zmq::object_t::send_term (object_t *destination_)
send_command (cmd);
}
-void zmq::object_t::send_term_ack (object_t *destination_)
+void zmq::object_t::send_term_ack (socket_base_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
@@ -161,7 +185,12 @@ void zmq::object_t::process_plug ()
zmq_assert (false);
}
-void zmq::object_t::process_own (object_t *object_)
+void zmq::object_t::process_own (owned_t *object_)
+{
+ zmq_assert (false);
+}
+
+void zmq::object_t::process_attach (zmq_engine_t *engine_)
{
zmq_assert (false);
}
@@ -171,7 +200,7 @@ void zmq::object_t::process_bind ()
zmq_assert (false);
}
-void zmq::object_t::process_term_req (object_t *object_)
+void zmq::object_t::process_term_req (owned_t *object_)
{
zmq_assert (false);
}