From 80ac398bba928fa7f245d2e107071677a13185cf Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 9 Feb 2011 15:32:15 +0100 Subject: Initial implementation of reaper thread. Reaper thread destroys the socket asynchronously. zmq_term() can be interrupted by a signal (EINTR). zmq_socket() will return ETERM after zmq_term() was called. Signed-off-by: Martin Sustrik --- src/object.cpp | 42 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) (limited to 'src/object.cpp') diff --git a/src/object.cpp b/src/object.cpp index 63b42b4..823d4b6 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -114,6 +114,10 @@ void zmq::object_t::process_command (command_t &cmd_) process_term_ack (); break; + case command_t::reap: + process_reap (cmd_.args.reap.socket); + break; + default: zmq_assert (false); } @@ -138,6 +142,11 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_) return ctx->find_endpoint (addr_); } +void zmq::object_t::destroy_socket (socket_base_t *socket_) +{ + ctx->destroy_socket (socket_); +} + void zmq::object_t::log (zmq_msg_t *msg_) { ctx->log (msg_); @@ -148,11 +157,6 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_) return ctx->choose_io_thread (affinity_); } -void zmq::object_t::zombify_socket (socket_base_t *socket_) -{ - ctx->zombify_socket (socket_); -} - void zmq::object_t::send_stop () { // 'stop' command goes always from administrative thread to @@ -336,6 +340,29 @@ void zmq::object_t::send_term_ack (own_t *destination_) send_command (cmd); } +void zmq::object_t::send_reap (class socket_base_t *socket_) +{ + command_t cmd; +#if defined ZMQ_MAKE_VALGRIND_HAPPY + memset (&cmd, 0, sizeof (cmd)); +#endif + cmd.destination = ctx->get_reaper (); + cmd.type = command_t::reap; + cmd.args.reap.socket = socket_; + send_command (cmd); +} + +void zmq::object_t::send_done () +{ + command_t cmd; +#if defined ZMQ_MAKE_VALGRIND_HAPPY + memset (&cmd, 0, sizeof (cmd)); +#endif + cmd.destination = NULL; + cmd.type = command_t::done; + ctx->send_command (ctx_t::term_tid, cmd); +} + void zmq::object_t::process_stop () { zmq_assert (false); @@ -398,6 +425,11 @@ void zmq::object_t::process_term_ack () zmq_assert (false); } +void zmq::object_t::process_reap (class socket_base_t *socket_) +{ + zmq_assert (false); +} + void zmq::object_t::process_seqnum () { zmq_assert (false); -- cgit v1.2.3