diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-08-12 08:16:18 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-08-25 15:39:20 +0200 |
commit | 936dbf956b0f1471a96fc06bcba67765257dbc4a (patch) | |
tree | b23704ec1d4d6f8c6c94e55919fcfcc1d0f26d6a | |
parent | 76bd6e73c335dbebd8bd30565f83a810058f2cc8 (diff) |
dezombification procedure fixed
-rw-r--r-- | src/ctx.cpp | 46 | ||||
-rw-r--r-- | src/ctx.hpp | 12 | ||||
-rw-r--r-- | src/object.cpp | 5 | ||||
-rw-r--r-- | src/object.hpp | 3 | ||||
-rw-r--r-- | src/socket_base.cpp | 10 | ||||
-rw-r--r-- | src/socket_base.hpp | 4 | ||||
-rw-r--r-- | src/zmq.cpp | 2 |
7 files changed, 59 insertions, 23 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp index d096b91..a958833 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -97,7 +97,7 @@ zmq::ctx_t::~ctx_t () #endif } -int zmq::ctx_t::term () +int zmq::ctx_t::terminate () { // First send stop command to sockets so that any // blocking calls are interrupted. @@ -115,12 +115,15 @@ int zmq::ctx_t::term () if (no_sockets_notify) no_sockets_sync.wait (); - // At this point there's only one application thread (this one) remaining. - // We don't even have to synchronise access to data. + // At this point there should be no active sockets. What we have is a set + // of zombies waiting to be dezombified. zmq_assert (sockets.empty ()); -// TODO: We are accessing the list of zombies in unsynchronised manner here! - // Get rid of remaining zombie sockets. + // Get rid of remaining zombie sockets. Note that the lock won't block + // anyone here. There's noone else having open sockets anyway. The only + // purpose of the lock is to double-check all the CPU caches have been + // synchronised. + slot_sync.lock (); while (!zombies.empty ()) { dezombify (); @@ -134,6 +137,7 @@ int zmq::ctx_t::term () usleep (1000); #endif } + slot_sync.unlock (); // Deallocate the resources. delete this; @@ -197,6 +201,22 @@ void zmq::ctx_t::zombify_socket (socket_base_t *socket_) slot_sync.unlock (); } +void zmq::ctx_t::dezombify_socket (socket_base_t *socket_) +{ + // We assume that this function is called only within dezombification + // process, which in turn is running within a slot_sync critical section. + // Therefore, we need no locking here. + + // TODO: Can we do this better than O(n)? + zombies_t::iterator it = std::find (zombies.begin (), zombies.end (), + socket_); + zmq_assert (it != zombies.end ()); + + // Move from the slot from 'zombie' to 'empty' state. + empty_slots.push_back ((*it)->get_slot ()); + zombies.erase (it); +} + void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_) { slots [slot_]->send (command_); @@ -287,12 +307,14 @@ void zmq::ctx_t::dezombify () { // Try to dezombify each zombie in the list. Note that caller is // responsible for calling this method in the slot_sync critical section. - for (zombies_t::size_type i = 0; i != zombies.size ();) - if (zombies [i]->dezombify ()) { - empty_slots.push_back (zombies [i]->get_slot ()); - zombies.erase (zombies [i]); - } - else - i++; + zombies_t::iterator it = zombies.begin (); + while (it != zombies.end ()) { + zombies_t::iterator old = it; + ++it; + + // dezombify_socket can be called here that will invalidate + // the iterator. That's why we've got the next zombie beforehand. + (*old)->dezombify (); + } } diff --git a/src/ctx.hpp b/src/ctx.hpp index c44cca6..1b51151 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -20,9 +20,10 @@ #ifndef __ZMQ_CTX_HPP_INCLUDED__ #define __ZMQ_CTX_HPP_INCLUDED__ -#include <vector> #include <set> #include <map> +#include <list> +#include <vector> #include <string> #include "signaler.hpp" @@ -52,7 +53,7 @@ namespace zmq // no more sockets open it'll cause all the infrastructure to be shut // down. If there are open sockets still, the deallocation happens // after the last one is closed. - int term (); + int terminate (); // Create a socket. class socket_base_t *create_socket (int type_); @@ -60,6 +61,9 @@ namespace zmq // Make socket a zombie. void zombify_socket (socket_base_t *socket_); + // Kill the zombie socket. + void dezombify_socket (socket_base_t *socket_); + // Send command to the destination slot. void send_command (uint32_t slot_, const command_t &command_); @@ -83,9 +87,9 @@ namespace zmq typedef yarray_t <socket_base_t> sockets_t; sockets_t sockets; - // Array of sockets that were already closed but not yet deallocated. + // List of sockets that were already closed but not yet deallocated. // These sockets still have some pipes and I/O objects attached. - typedef yarray_t <socket_base_t> zombies_t; + typedef std::list <socket_base_t*> zombies_t; zombies_t zombies; // List of unused slots. diff --git a/src/object.cpp b/src/object.cpp index a8294b0..3296fcf 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -147,6 +147,11 @@ void zmq::object_t::zombify_socket (socket_base_t *socket_) ctx->zombify_socket (socket_); } +void zmq::object_t::dezombify_socket (socket_base_t *socket_) +{ + ctx->dezombify_socket (socket_); +} + void zmq::object_t::send_stop () { // 'stop' command goes always from administrative thread to diff --git a/src/object.hpp b/src/object.hpp index e083ce3..f146b25 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -55,6 +55,9 @@ namespace zmq // the context. void zombify_socket (class socket_base_t *socket_); + // Dezombify particular socket, i.e. destroy it. + void dezombify_socket (class socket_base_t *socket_); + // Derived object can use these functions to send commands // to other objects. void send_stop (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 903e781..89b8a29 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -118,10 +118,15 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : zmq::socket_base_t::~socket_base_t () { + zmq_assert (zombie); + // Check whether there are no session leaks. sessions_sync.lock (); zmq_assert (sessions.empty ()); sessions_sync.unlock (); + + // Mark the socket slot as empty. + dezombify_socket (this); } zmq::signaler_t *zmq::socket_base_t::get_signaler () @@ -599,16 +604,13 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) return session; } -bool zmq::socket_base_t::dezombify () +void zmq::socket_base_t::dezombify () { zmq_assert (zombie); // Process any commands from other threads/sockets that may be available // at the moment. Ultimately, socket will be destroyed. process_commands (false, false); - -// TODO: ??? - return true; } void zmq::socket_base_t::process_commands (bool block_, bool throttle_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index f76dc4c..785967e 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -85,8 +85,8 @@ namespace zmq void terminated (class writer_t *pipe_); // This function should be called only on zombie sockets. It tries - // to deallocate the zombie. Returns true if zombie is finally dead. - bool dezombify (); + // to deallocate the zombie. + void dezombify (); protected: diff --git a/src/zmq.cpp b/src/zmq.cpp index 1a74f86..e9bfc53 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -269,7 +269,7 @@ void *zmq_init (int io_threads_) int zmq_term (void *ctx_) { - int rc = ((zmq::ctx_t*) ctx_)->term (); + int rc = ((zmq::ctx_t*) ctx_)->terminate (); int en = errno; if (!ctx_) { |