summaryrefslogtreecommitdiff
path: root/src/ctx.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-02-09 15:32:15 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-02-09 15:32:15 +0100
commit80ac398bba928fa7f245d2e107071677a13185cf (patch)
tree57c51a7adb5f4cb1a91396fe0b223538a4d428d7 /src/ctx.cpp
parent889424e675eecd9d9c7d1121456401d5c43029a5 (diff)
Initial implementation of reaper thread.
Reaper thread destroys the socket asynchronously. zmq_term() can be interrupted by a signal (EINTR). zmq_socket() will return ETERM after zmq_term() was called. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/ctx.cpp')
-rw-r--r--src/ctx.cpp158
1 files changed, 73 insertions, 85 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 91b0236..10c91c9 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -24,6 +24,7 @@
#include "socket_base.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
+#include "reaper.hpp"
#include "err.hpp"
#include "pipe.hpp"
@@ -34,7 +35,7 @@
#endif
zmq::ctx_t::ctx_t (uint32_t io_threads_) :
- no_sockets_notify (false)
+ terminating (false)
{
int rc;
@@ -49,13 +50,23 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
HIBYTE (wsa_data.wVersion) == 2);
#endif
- // Initialise the array of mailboxes. +1 accounts for internal log socket.
- slot_count = max_sockets + io_threads_ + 1;
+ // Initialise the array of mailboxes. Additional three slots are for
+ // internal log socket and the zmq_term thread the reaper thread.
+ slot_count = max_sockets + io_threads_ + 3;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
zmq_assert (slots);
+ // Initialise the infrastructure for zmq_term thread.
+ slots [term_tid] = &term_mailbox;
+
+ // Create the reaper thread.
+ reaper = new (std::nothrow) reaper_t (this, reaper_tid);
+ zmq_assert (reaper);
+ slots [reaper_tid] = reaper->get_mailbox ();
+ reaper->start ();
+
// Create I/O thread objects and launch them.
- for (uint32_t i = 0; i != io_threads_; i++) {
+ for (uint32_t i = 2; i != io_threads_ + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
@@ -65,7 +76,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1;
- i >= (int32_t) io_threads_; i--) {
+ i >= (int32_t) io_threads_ + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
@@ -79,9 +90,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
zmq::ctx_t::~ctx_t ()
{
- // Check that there are no remaining open or zombie sockets.
+ // Check that there are no remaining sockets.
zmq_assert (sockets.empty ());
- zmq_assert (zombies.empty ());
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
@@ -92,6 +102,9 @@ zmq::ctx_t::~ctx_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
+ // Deallocate the reaper thread object.
+ delete reaper;
+
// Deallocate the array of mailboxes. No special work is
// needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects.
@@ -106,52 +119,42 @@ zmq::ctx_t::~ctx_t ()
int zmq::ctx_t::terminate ()
{
- // Close the logging infrastructure.
- log_sync.lock ();
- int rc = log_socket->close ();
- zmq_assert (rc == 0);
- log_socket = NULL;
- log_sync.unlock ();
-
- // First send stop command to sockets so that any
- // blocking calls are interrupted.
+ // Check whether termination was already underway, but interrupted and now
+ // restarted.
slot_sync.lock ();
- for (sockets_t::size_type i = 0; i != sockets.size (); i++)
- sockets [i]->stop ();
- if (!sockets.empty ())
- no_sockets_notify = true;
+ bool restarted = terminating;
slot_sync.unlock ();
- // Find out whether there are any open sockets to care about.
- // If there are open sockets, sleep till they are closed. Note that we can
- // use no_sockets_notify safely out of the critical section as once set
- // its value is never changed again.
- if (no_sockets_notify)
- no_sockets_sync.wait ();
+ // First attempt to terminate the context.
+ if (!restarted) {
+
+ // Close the logging infrastructure.
+ log_sync.lock ();
+ int rc = log_socket->close ();
+ zmq_assert (rc == 0);
+ log_socket = NULL;
+ log_sync.unlock ();
+
+ // First send stop command to sockets so that any blocking calls can be
+ // interrupted. If there are no sockets we can ask reaper thread to stop.
+ slot_sync.lock ();
+ terminating = true;
+ for (sockets_t::size_type i = 0; i != sockets.size (); i++)
+ sockets [i]->stop ();
+ if (sockets.empty ())
+ reaper->stop ();
+ slot_sync.unlock ();
+ }
- // Note that the lock won't block anyone here. There's noone else having
- // open sockets anyway. The only purpose of the lock is to double-check all
- // the CPU caches have been synchronised.
+ // Wait till reaper thread closes all the sockets.
+ command_t cmd;
+ int rc = term_mailbox.recv (&cmd, true);
+ if (rc == -1 && errno == EINTR)
+ return -1;
+ zmq_assert (rc == 0);
+ zmq_assert (cmd.type == command_t::done);
slot_sync.lock ();
-
- // At this point there should be no active sockets. What we have is a set
- // of zombies waiting to be dezombified.
zmq_assert (sockets.empty ());
-
- // Get rid of remaining zombie sockets.
- while (!zombies.empty ()) {
- dezombify ();
-
- // Sleep for 1ms not to end up busy-looping in the case the I/O threads
- // are still busy sending data. We can possibly add a grand poll here
- // (polling for fds associated with all the zombie sockets), but it's
- // probably not worth of implementing it.
-#if defined ZMQ_HAVE_WINDOWS
- Sleep (1);
-#else
- usleep (1000);
-#endif
- }
slot_sync.unlock ();
// Deallocate the resources.
@@ -164,8 +167,12 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
slot_sync.lock ();
- // Free the slots, if possible.
- dezombify ();
+ // Once zmq_term() was called, we can't create new sockets.
+ if (terminating) {
+ slot_sync.unlock ();
+ errno = ETERM;
+ return NULL;
+ }
// If max_sockets limit was reached, return error.
if (empty_slots.empty ()) {
@@ -193,29 +200,31 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return s;
}
-void zmq::ctx_t::zombify_socket (socket_base_t *socket_)
+void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
{
- // Zombification of socket basically means that its ownership is tranferred
- // from the application that created it to the context.
-
- // Note that the lock provides the memory barrier needed to migrate
- // zombie-to-be socket from it's native thread to shared data area
- // synchronised by slot_sync.
slot_sync.lock ();
- sockets.erase (socket_);
- zombies.push_back (socket_);
- // Try to get rid of at least some zombie sockets at this point.
- dezombify ();
+ // Free the associared thread slot.
+ uint32_t tid = socket_->get_tid ();
+ empty_slots.push_back (tid);
+ slots [tid] = NULL;
- // If shutdown thread is interested in notification about no more
- // open sockets, notify it now.
- if (sockets.empty () && no_sockets_notify)
- no_sockets_sync.post ();
+ // Remove the socket from the list of sockets.
+ sockets.erase (socket_);
+
+ // If zmq_term() was already called and there are no more socket
+ // we can ask reaper thread to terminate.
+ if (terminating && sockets.empty ())
+ reaper->stop ();
slot_sync.unlock ();
}
+zmq::object_t *zmq::ctx_t::get_reaper ()
+{
+ return reaper;
+}
+
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
slots [tid_]->send (command_);
@@ -309,25 +318,4 @@ void zmq::ctx_t::log (zmq_msg_t *msg_)
log_sync.unlock ();
}
-void zmq::ctx_t::dezombify ()
-{
- // Try to dezombify each zombie in the list. Note that caller is
- // responsible for calling this method in the slot_sync critical section.
- for (zombies_t::iterator it = zombies.begin (); it != zombies.end ();) {
- uint32_t tid = (*it)->get_tid ();
- if ((*it)->dezombify ()) {
-#if defined _MSC_VER
-
- // HP implementation of STL requires doing it this way...
- it = zombies.erase (it);
-#else
- zombies.erase (it);
-#endif
- empty_slots.push_back (tid);
- slots [tid] = NULL;
- }
- else
- ++it;
- }
-}