From 936dbf956b0f1471a96fc06bcba67765257dbc4a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 12 Aug 2010 08:16:18 +0200 Subject: dezombification procedure fixed --- src/ctx.cpp | 46 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 12 deletions(-) (limited to 'src/ctx.cpp') diff --git a/src/ctx.cpp b/src/ctx.cpp index d096b91..a958833 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -97,7 +97,7 @@ zmq::ctx_t::~ctx_t () #endif } -int zmq::ctx_t::term () +int zmq::ctx_t::terminate () { // First send stop command to sockets so that any // blocking calls are interrupted. @@ -115,12 +115,15 @@ int zmq::ctx_t::term () if (no_sockets_notify) no_sockets_sync.wait (); - // At this point there's only one application thread (this one) remaining. - // We don't even have to synchronise access to data. + // At this point there should be no active sockets. What we have is a set + // of zombies waiting to be dezombified. zmq_assert (sockets.empty ()); -// TODO: We are accessing the list of zombies in unsynchronised manner here! - // Get rid of remaining zombie sockets. + // Get rid of remaining zombie sockets. Note that the lock won't block + // anyone here. There's noone else having open sockets anyway. The only + // purpose of the lock is to double-check all the CPU caches have been + // synchronised. + slot_sync.lock (); while (!zombies.empty ()) { dezombify (); @@ -134,6 +137,7 @@ int zmq::ctx_t::term () usleep (1000); #endif } + slot_sync.unlock (); // Deallocate the resources. delete this; @@ -197,6 +201,22 @@ void zmq::ctx_t::zombify_socket (socket_base_t *socket_) slot_sync.unlock (); } +void zmq::ctx_t::dezombify_socket (socket_base_t *socket_) +{ + // We assume that this function is called only within dezombification + // process, which in turn is running within a slot_sync critical section. + // Therefore, we need no locking here. + + // TODO: Can we do this better than O(n)? + zombies_t::iterator it = std::find (zombies.begin (), zombies.end (), + socket_); + zmq_assert (it != zombies.end ()); + + // Move from the slot from 'zombie' to 'empty' state. + empty_slots.push_back ((*it)->get_slot ()); + zombies.erase (it); +} + void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_) { slots [slot_]->send (command_); @@ -287,12 +307,14 @@ void zmq::ctx_t::dezombify () { // Try to dezombify each zombie in the list. Note that caller is // responsible for calling this method in the slot_sync critical section. - for (zombies_t::size_type i = 0; i != zombies.size ();) - if (zombies [i]->dezombify ()) { - empty_slots.push_back (zombies [i]->get_slot ()); - zombies.erase (zombies [i]); - } - else - i++; + zombies_t::iterator it = zombies.begin (); + while (it != zombies.end ()) { + zombies_t::iterator old = it; + ++it; + + // dezombify_socket can be called here that will invalidate + // the iterator. That's why we've got the next zombie beforehand. + (*old)->dezombify (); + } } -- cgit v1.2.3