summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-02-09 15:32:15 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-02-09 15:32:15 +0100
commit80ac398bba928fa7f245d2e107071677a13185cf (patch)
tree57c51a7adb5f4cb1a91396fe0b223538a4d428d7
parent889424e675eecd9d9c7d1121456401d5c43029a5 (diff)
Initial implementation of reaper thread.
Reaper thread destroys the socket asynchronously. zmq_term() can be interrupted by a signal (EINTR). zmq_socket() will return ETERM after zmq_term() was called. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--doc/zmq_socket.txt3
-rw-r--r--doc/zmq_term.txt2
-rw-r--r--src/Makefile.am2
-rw-r--r--src/command.hpp15
-rw-r--r--src/ctx.cpp158
-rw-r--r--src/ctx.hpp43
-rw-r--r--src/io_thread.cpp2
-rw-r--r--src/io_thread.hpp3
-rw-r--r--src/object.cpp42
-rw-r--r--src/object.hpp8
-rw-r--r--src/reaper.cpp139
-rw-r--r--src/reaper.hpp81
-rw-r--r--src/socket_base.cpp18
-rw-r--r--src/socket_base.hpp8
14 files changed, 394 insertions, 130 deletions
diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt
index 7ac8fa1..da41ad7 100644
--- a/doc/zmq_socket.txt
+++ b/doc/zmq_socket.txt
@@ -315,7 +315,8 @@ ERRORS
The requested socket 'type' is invalid.
*EFAULT*::
The provided 'context' was not valid (NULL).
-
+*ETERM*::
+The context specified was terminated.
SEE ALSO
--------
diff --git a/doc/zmq_term.txt b/doc/zmq_term.txt
index 2edc765..d3914db 100644
--- a/doc/zmq_term.txt
+++ b/doc/zmq_term.txt
@@ -47,6 +47,8 @@ ERRORS
------
*EFAULT*::
The provided 'context' was not valid (NULL).
+*EINTR*::
+Termination was interrupted by a signal. It can be restarted if needed.
SEE ALSO
diff --git a/src/Makefile.am b/src/Makefile.am
index 9e362f2..b730f38 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -107,6 +107,7 @@ libzmq_la_SOURCES = \
pub.hpp \
pull.hpp \
push.hpp \
+ reaper.hpp \
rep.hpp \
req.hpp \
select.hpp \
@@ -166,6 +167,7 @@ libzmq_la_SOURCES = \
poller_base.cpp \
pull.cpp \
push.cpp \
+ reaper.cpp \
pub.cpp \
rep.cpp \
req.cpp \
diff --git a/src/command.hpp b/src/command.hpp
index 31a0e54..ec0850d 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -45,7 +45,9 @@ namespace zmq
pipe_term_ack,
term_req,
term,
- term_ack
+ term_ack,
+ reap,
+ done
} type;
union {
@@ -117,6 +119,17 @@ namespace zmq
struct {
} term_ack;
+ // Transfers the ownership of the closed socket
+ // to the reaper thread.
+ struct {
+ class socket_base_t *socket;
+ } reap;
+
+ // Sent by reaper thread to the term thread when all the sockets
+ // are successfully deallocated.
+ struct {
+ } done;
+
} args;
};
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 91b0236..10c91c9 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -24,6 +24,7 @@
#include "socket_base.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
+#include "reaper.hpp"
#include "err.hpp"
#include "pipe.hpp"
@@ -34,7 +35,7 @@
#endif
zmq::ctx_t::ctx_t (uint32_t io_threads_) :
- no_sockets_notify (false)
+ terminating (false)
{
int rc;
@@ -49,13 +50,23 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
HIBYTE (wsa_data.wVersion) == 2);
#endif
- // Initialise the array of mailboxes. +1 accounts for internal log socket.
- slot_count = max_sockets + io_threads_ + 1;
+ // Initialise the array of mailboxes. Additional three slots are for
+ // internal log socket and the zmq_term thread the reaper thread.
+ slot_count = max_sockets + io_threads_ + 3;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
zmq_assert (slots);
+ // Initialise the infrastructure for zmq_term thread.
+ slots [term_tid] = &term_mailbox;
+
+ // Create the reaper thread.
+ reaper = new (std::nothrow) reaper_t (this, reaper_tid);
+ zmq_assert (reaper);
+ slots [reaper_tid] = reaper->get_mailbox ();
+ reaper->start ();
+
// Create I/O thread objects and launch them.
- for (uint32_t i = 0; i != io_threads_; i++) {
+ for (uint32_t i = 2; i != io_threads_ + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
@@ -65,7 +76,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1;
- i >= (int32_t) io_threads_; i--) {
+ i >= (int32_t) io_threads_ + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
@@ -79,9 +90,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
zmq::ctx_t::~ctx_t ()
{
- // Check that there are no remaining open or zombie sockets.
+ // Check that there are no remaining sockets.
zmq_assert (sockets.empty ());
- zmq_assert (zombies.empty ());
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
@@ -92,6 +102,9 @@ zmq::ctx_t::~ctx_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
+ // Deallocate the reaper thread object.
+ delete reaper;
+
// Deallocate the array of mailboxes. No special work is
// needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects.
@@ -106,52 +119,42 @@ zmq::ctx_t::~ctx_t ()
int zmq::ctx_t::terminate ()
{
- // Close the logging infrastructure.
- log_sync.lock ();
- int rc = log_socket->close ();
- zmq_assert (rc == 0);
- log_socket = NULL;
- log_sync.unlock ();
-
- // First send stop command to sockets so that any
- // blocking calls are interrupted.
+ // Check whether termination was already underway, but interrupted and now
+ // restarted.
slot_sync.lock ();
- for (sockets_t::size_type i = 0; i != sockets.size (); i++)
- sockets [i]->stop ();
- if (!sockets.empty ())
- no_sockets_notify = true;
+ bool restarted = terminating;
slot_sync.unlock ();
- // Find out whether there are any open sockets to care about.
- // If there are open sockets, sleep till they are closed. Note that we can
- // use no_sockets_notify safely out of the critical section as once set
- // its value is never changed again.
- if (no_sockets_notify)
- no_sockets_sync.wait ();
+ // First attempt to terminate the context.
+ if (!restarted) {
+
+ // Close the logging infrastructure.
+ log_sync.lock ();
+ int rc = log_socket->close ();
+ zmq_assert (rc == 0);
+ log_socket = NULL;
+ log_sync.unlock ();
+
+ // First send stop command to sockets so that any blocking calls can be
+ // interrupted. If there are no sockets we can ask reaper thread to stop.
+ slot_sync.lock ();
+ terminating = true;
+ for (sockets_t::size_type i = 0; i != sockets.size (); i++)
+ sockets [i]->stop ();
+ if (sockets.empty ())
+ reaper->stop ();
+ slot_sync.unlock ();
+ }
- // Note that the lock won't block anyone here. There's noone else having
- // open sockets anyway. The only purpose of the lock is to double-check all
- // the CPU caches have been synchronised.
+ // Wait till reaper thread closes all the sockets.
+ command_t cmd;
+ int rc = term_mailbox.recv (&cmd, true);
+ if (rc == -1 && errno == EINTR)
+ return -1;
+ zmq_assert (rc == 0);
+ zmq_assert (cmd.type == command_t::done);
slot_sync.lock ();
-
- // At this point there should be no active sockets. What we have is a set
- // of zombies waiting to be dezombified.
zmq_assert (sockets.empty ());
-
- // Get rid of remaining zombie sockets.
- while (!zombies.empty ()) {
- dezombify ();
-
- // Sleep for 1ms not to end up busy-looping in the case the I/O threads
- // are still busy sending data. We can possibly add a grand poll here
- // (polling for fds associated with all the zombie sockets), but it's
- // probably not worth of implementing it.
-#if defined ZMQ_HAVE_WINDOWS
- Sleep (1);
-#else
- usleep (1000);
-#endif
- }
slot_sync.unlock ();
// Deallocate the resources.
@@ -164,8 +167,12 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
slot_sync.lock ();
- // Free the slots, if possible.
- dezombify ();
+ // Once zmq_term() was called, we can't create new sockets.
+ if (terminating) {
+ slot_sync.unlock ();
+ errno = ETERM;
+ return NULL;
+ }
// If max_sockets limit was reached, return error.
if (empty_slots.empty ()) {
@@ -193,29 +200,31 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return s;
}
-void zmq::ctx_t::zombify_socket (socket_base_t *socket_)
+void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
{
- // Zombification of socket basically means that its ownership is tranferred
- // from the application that created it to the context.
-
- // Note that the lock provides the memory barrier needed to migrate
- // zombie-to-be socket from it's native thread to shared data area
- // synchronised by slot_sync.
slot_sync.lock ();
- sockets.erase (socket_);
- zombies.push_back (socket_);
- // Try to get rid of at least some zombie sockets at this point.
- dezombify ();
+ // Free the associared thread slot.
+ uint32_t tid = socket_->get_tid ();
+ empty_slots.push_back (tid);
+ slots [tid] = NULL;
- // If shutdown thread is interested in notification about no more
- // open sockets, notify it now.
- if (sockets.empty () && no_sockets_notify)
- no_sockets_sync.post ();
+ // Remove the socket from the list of sockets.
+ sockets.erase (socket_);
+
+ // If zmq_term() was already called and there are no more socket
+ // we can ask reaper thread to terminate.
+ if (terminating && sockets.empty ())
+ reaper->stop ();
slot_sync.unlock ();
}
+zmq::object_t *zmq::ctx_t::get_reaper ()
+{
+ return reaper;
+}
+
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
slots [tid_]->send (command_);
@@ -309,25 +318,4 @@ void zmq::ctx_t::log (zmq_msg_t *msg_)
log_sync.unlock ();
}
-void zmq::ctx_t::dezombify ()
-{
- // Try to dezombify each zombie in the list. Note that caller is
- // responsible for calling this method in the slot_sync critical section.
- for (zombies_t::iterator it = zombies.begin (); it != zombies.end ();) {
- uint32_t tid = (*it)->get_tid ();
- if ((*it)->dezombify ()) {
-#if defined _MSC_VER
-
- // HP implementation of STL requires doing it this way...
- it = zombies.erase (it);
-#else
- zombies.erase (it);
-#endif
- empty_slots.push_back (tid);
- slots [tid] = NULL;
- }
- else
- ++it;
- }
-}
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 92684e6..c07711e 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -64,11 +64,9 @@ namespace zmq
// after the last one is closed.
int terminate ();
- // Create a socket.
+ // Create and destroy a socket.
class socket_base_t *create_socket (int type_);
-
- // Make socket a zombie.
- void zombify_socket (socket_base_t *socket_);
+ void destroy_socket (class socket_base_t *socket_);
// Send command to the destination thread.
void send_command (uint32_t tid_, const command_t &command_);
@@ -78,6 +76,9 @@ namespace zmq
// Returns NULL is no I/O thread is available.
class io_thread_t *choose_io_thread (uint64_t affinity_);
+ // Returns reaper thread object.
+ class object_t *get_reaper ();
+
// Management of inproc endpoints.
int register_endpoint (const char *addr_, endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_);
@@ -86,41 +87,36 @@ namespace zmq
// Logging.
void log (zmq_msg_t *msg_);
+ enum {
+ term_tid = 0,
+ reaper_tid = 1
+ };
+
private:
~ctx_t ();
- // Sockets belonging to this context.
+ // Sockets belonging to this context. We need the list so that
+ // we can notify the sockets when zmq_term() is called. The sockets
+ // will return ETERM then.
typedef array_t <socket_base_t> sockets_t;
sockets_t sockets;
- // List of sockets that were already closed but not yet deallocated.
- // These sockets still have some pipes and I/O objects attached.
- typedef std::vector <socket_base_t*> zombies_t;
- zombies_t zombies;
-
// List of unused thread slots.
typedef std::vector <uint32_t> emtpy_slots_t;
emtpy_slots_t empty_slots;
- // If true, shutdown thread wants to be informed when there are no
- // more open sockets. Do so by posting no_sockets_sync semaphore.
- // Note that this variable is synchronised by slot_sync mutex.
- bool no_sockets_notify;
-
- // Object used by zmq_term to wait while all the sockets are closed
- // by different application threads.
- semaphore_t no_sockets_sync;
+ // If true, zmq_term was already called.
+ bool terminating;
// Synchronisation of accesses to global slot-related data:
- // sockets, zombies, empty_slots, terminated. It also synchronises
+ // sockets, empty_slots, terminating. It also synchronises
// access to zombie sockets as such (as oposed to slots) and provides
// a memory barrier to ensure that all CPU cores see the same data.
mutex_t slot_sync;
- // This function attempts to deallocate as many zombie sockets as
- // possible. It must be called within a slot_sync critical section.
- void dezombify ();
+ // The reaper thread.
+ class reaper_t *reaper;
// I/O threads.
typedef std::vector <class io_thread_t*> io_threads_t;
@@ -130,6 +126,9 @@ namespace zmq
uint32_t slot_count;
mailbox_t **slots;
+ // Mailbox for zmq_term thread.
+ mailbox_t term_mailbox;
+
// List of inproc endpoints within this context.
typedef std::map <std::string, endpoint_t> endpoints_t;
endpoints_t endpoints;
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 7ba8905..ca768f5 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -75,7 +75,7 @@ void zmq::io_thread_t::in_event ()
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
- break;
+ break;
errno_assert (rc == 0);
// Process the command.
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index b01eecb..b3ea484 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -77,6 +77,9 @@ namespace zmq
// I/O multiplexing is performed using a poller object.
poller_t *poller;
+
+ io_thread_t (const io_thread_t&);
+ const io_thread_t &operator = (const io_thread_t&);
};
}
diff --git a/src/object.cpp b/src/object.cpp
index 63b42b4..823d4b6 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -114,6 +114,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_term_ack ();
break;
+ case command_t::reap:
+ process_reap (cmd_.args.reap.socket);
+ break;
+
default:
zmq_assert (false);
}
@@ -138,6 +142,11 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
return ctx->find_endpoint (addr_);
}
+void zmq::object_t::destroy_socket (socket_base_t *socket_)
+{
+ ctx->destroy_socket (socket_);
+}
+
void zmq::object_t::log (zmq_msg_t *msg_)
{
ctx->log (msg_);
@@ -148,11 +157,6 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
return ctx->choose_io_thread (affinity_);
}
-void zmq::object_t::zombify_socket (socket_base_t *socket_)
-{
- ctx->zombify_socket (socket_);
-}
-
void zmq::object_t::send_stop ()
{
// 'stop' command goes always from administrative thread to
@@ -336,6 +340,29 @@ void zmq::object_t::send_term_ack (own_t *destination_)
send_command (cmd);
}
+void zmq::object_t::send_reap (class socket_base_t *socket_)
+{
+ command_t cmd;
+#if defined ZMQ_MAKE_VALGRIND_HAPPY
+ memset (&cmd, 0, sizeof (cmd));
+#endif
+ cmd.destination = ctx->get_reaper ();
+ cmd.type = command_t::reap;
+ cmd.args.reap.socket = socket_;
+ send_command (cmd);
+}
+
+void zmq::object_t::send_done ()
+{
+ command_t cmd;
+#if defined ZMQ_MAKE_VALGRIND_HAPPY
+ memset (&cmd, 0, sizeof (cmd));
+#endif
+ cmd.destination = NULL;
+ cmd.type = command_t::done;
+ ctx->send_command (ctx_t::term_tid, cmd);
+}
+
void zmq::object_t::process_stop ()
{
zmq_assert (false);
@@ -398,6 +425,11 @@ void zmq::object_t::process_term_ack ()
zmq_assert (false);
}
+void zmq::object_t::process_reap (class socket_base_t *socket_)
+{
+ zmq_assert (false);
+}
+
void zmq::object_t::process_seqnum ()
{
zmq_assert (false);
diff --git a/src/object.hpp b/src/object.hpp
index 0fd6f47..cee82c8 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -49,6 +49,7 @@ namespace zmq
int register_endpoint (const char *addr_, struct endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_);
struct endpoint_t find_endpoint (const char *addr_);
+ void destroy_socket (class socket_base_t *socket_);
// Logs an message.
void log (zmq_msg_t *msg_);
@@ -56,10 +57,6 @@ namespace zmq
// Chooses least loaded I/O thread.
class io_thread_t *choose_io_thread (uint64_t affinity_);
- // Zombify particular socket. In other words, pass the ownership to
- // the context.
- void zombify_socket (class socket_base_t *socket_);
-
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
@@ -82,6 +79,8 @@ namespace zmq
class own_t *object_);
void send_term (class own_t *destination_, int linger_);
void send_term_ack (class own_t *destination_);
+ void send_reap (class socket_base_t *socket_);
+ void send_done ();
// These handlers can be overloaded by the derived objects. They are
// called when command arrives from another thread.
@@ -99,6 +98,7 @@ namespace zmq
virtual void process_term_req (class own_t *object_);
virtual void process_term (int linger_);
virtual void process_term_ack ();
+ virtual void process_reap (class socket_base_t *socket_);
// Special handler called after a command that requires a seqnum
// was processed. The implementation should catch up with its counter
diff --git a/src/reaper.cpp b/src/reaper.cpp
new file mode 100644
index 0000000..f114f56
--- /dev/null
+++ b/src/reaper.cpp
@@ -0,0 +1,139 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "reaper.hpp"
+#include "socket_base.hpp"
+#include "err.hpp"
+
+zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
+ object_t (ctx_, tid_),
+ terminating (false),
+ has_timer (false)
+{
+ poller = new (std::nothrow) poller_t;
+ zmq_assert (poller);
+
+ mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
+ poller->set_pollin (mailbox_handle);
+}
+
+zmq::reaper_t::~reaper_t ()
+{
+ delete poller;
+}
+
+zmq::mailbox_t *zmq::reaper_t::get_mailbox ()
+{
+ return &mailbox;
+}
+
+void zmq::reaper_t::start ()
+{
+ // Start the thread.
+ poller->start ();
+}
+
+void zmq::reaper_t::stop ()
+{
+ send_stop ();
+}
+
+void zmq::reaper_t::in_event ()
+{
+ while (true) {
+
+ // Get the next command. If there is none, exit.
+ command_t cmd;
+ int rc = mailbox.recv (&cmd, false);
+ if (rc != 0 && errno == EINTR)
+ continue;
+ if (rc != 0 && errno == EAGAIN)
+ break;
+ errno_assert (rc == 0);
+
+ // Process the command.
+ cmd.destination->process_command (cmd);
+ }
+}
+
+void zmq::reaper_t::out_event ()
+{
+ // We are never polling for POLLOUT here. This function is never called.
+ zmq_assert (false);
+}
+
+void zmq::reaper_t::timer_event (int id_)
+{
+ zmq_assert (has_timer);
+ has_timer = false;
+ reap ();
+}
+
+void zmq::reaper_t::reap ()
+{
+ // Try to reap each socket in the list.
+ for (sockets_t::iterator it = sockets.begin (); it != sockets.end ();) {
+ if ((*it)->reap ()) {
+
+ // MSVC version of STL requires this to be done a spacial way...
+#if defined _MSC_VER
+ it = sockets.erase (it);
+#else
+ sockets.erase (it);
+#endif
+ }
+ else
+ ++it;
+ }
+
+ // If there are still sockets to reap, wait a while, then try again.
+ if (!sockets.empty () && !has_timer) {
+ poller->add_timer (1 , this, 0);
+ has_timer = true;
+ return;
+ }
+
+ // No more sockets and the context is already shutting down.
+ if (terminating) {
+ send_done ();
+ poller->rm_fd (mailbox_handle);
+ poller->stop ();
+ return;
+ }
+}
+
+void zmq::reaper_t::process_stop ()
+{
+ terminating = true;
+
+ if (sockets.empty ()) {
+ send_done ();
+ poller->rm_fd (mailbox_handle);
+ poller->stop ();
+ }
+}
+
+void zmq::reaper_t::process_reap (socket_base_t *socket_)
+{
+ // Start termination of associated I/O object hierarchy.
+ socket_->terminate ();
+ sockets.push_back (socket_);
+ reap ();
+}
+
diff --git a/src/reaper.hpp b/src/reaper.hpp
new file mode 100644
index 0000000..8598cd9
--- /dev/null
+++ b/src/reaper.hpp
@@ -0,0 +1,81 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_REAPER_HPP_INCLUDED__
+#define __ZMQ_REAPER_HPP_INCLUDED__
+
+#include "object.hpp"
+#include "mailbox.hpp"
+#include "poller.hpp"
+#include "i_poll_events.hpp"
+
+namespace zmq
+{
+
+ class reaper_t : public object_t, public i_poll_events
+ {
+ public:
+
+ reaper_t (class ctx_t *ctx_, uint32_t tid_);
+ ~reaper_t ();
+
+ mailbox_t *get_mailbox ();
+
+ void start ();
+ void stop ();
+
+ // i_poll_events implementation.
+ void in_event ();
+ void out_event ();
+ void timer_event (int id_);
+
+ private:
+
+ void reap ();
+
+ // Command handlers.
+ void process_stop ();
+ void process_reap (class socket_base_t *socket_);
+
+ // List of all sockets being terminated.
+ typedef std::vector <class socket_base_t*> sockets_t;
+ sockets_t sockets;
+
+ // Reaper thread accesses incoming commands via this mailbox.
+ mailbox_t mailbox;
+
+ // Handle associated with mailbox' file descriptor.
+ poller_t::handle_t mailbox_handle;
+
+ // I/O multiplexing is performed using a poller object.
+ poller_t *poller;
+
+ // If true, we were already asked to terminate.
+ bool terminating;
+
+ // If true, timer till next reaping is running.
+ bool has_timer;
+
+ reaper_t (const reaper_t&);
+ const reaper_t &operator = (const reaper_t&);
+ };
+
+}
+
+#endif
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index f19187f..0643d4d 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -570,13 +570,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
- // Start termination of associated I/O object hierarchy.
- terminate ();
-
- // Ask context to zombify this socket. In other words, transfer
- // the ownership of the socket from this application thread
- // to the context which will take care of the rest of shutdown process.
- zombify_socket (this);
+ // Transfer the ownership of the socket from this application thread
+ // to the reaper thread which will take care of the rest of shutdown
+ // process.
+ send_reap (this);
return 0;
}
@@ -627,7 +624,7 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
return session;
}
-bool zmq::socket_base_t::dezombify ()
+bool zmq::socket_base_t::reap ()
{
// Process any commands from other threads/sockets that may be available
// at the moment. Ultimately, socket will be destroyed.
@@ -635,6 +632,11 @@ bool zmq::socket_base_t::dezombify ()
// If the object was already marked as destroyed, finish the deallocation.
if (destroyed) {
+
+ // Remove the socket from the context.
+ destroy_socket (this);
+
+ // Deallocate.
own_t::process_destroy ();
return true;
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 057f8b1..a74b7d0 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -42,6 +42,8 @@ namespace zmq
public own_t,
public array_item_t
{
+ friend class reaper_t;
+
public:
// Create a socket of a specified type.
@@ -82,9 +84,9 @@ namespace zmq
void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_);
- // This function should be called only on zombie sockets. It tries
- // to deallocate the zombie. Returns true is object is destroyed.
- bool dezombify ();
+ // This function should be called only on sockets that are already
+ // closed -- from the reaper thread. It tries to finalise the socket.
+ bool reap ();
protected: