diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-08-12 15:03:51 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-08-25 15:39:20 +0200 |
commit | 45f83d78a56f4b3a812c87fec03a75558445b2ab (patch) | |
tree | 1adba1798c914baf65929d89ed9725dd68672bd6 | |
parent | 936dbf956b0f1471a96fc06bcba67765257dbc4a (diff) |
one more dezombification bug fixed
-rw-r--r-- | src/ctx.cpp | 33 | ||||
-rw-r--r-- | src/ctx.hpp | 7 | ||||
-rw-r--r-- | src/object.cpp | 5 | ||||
-rw-r--r-- | src/object.hpp | 3 | ||||
-rw-r--r-- | src/own.cpp | 7 | ||||
-rw-r--r-- | src/own.hpp | 5 | ||||
-rw-r--r-- | src/socket_base.cpp | 21 | ||||
-rw-r--r-- | src/socket_base.hpp | 12 |
8 files changed, 46 insertions, 47 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp index a958833..79145eb 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -201,22 +201,6 @@ void zmq::ctx_t::zombify_socket (socket_base_t *socket_) slot_sync.unlock (); } -void zmq::ctx_t::dezombify_socket (socket_base_t *socket_) -{ - // We assume that this function is called only within dezombification - // process, which in turn is running within a slot_sync critical section. - // Therefore, we need no locking here. - - // TODO: Can we do this better than O(n)? - zombies_t::iterator it = std::find (zombies.begin (), zombies.end (), - socket_); - zmq_assert (it != zombies.end ()); - - // Move from the slot from 'zombie' to 'empty' state. - empty_slots.push_back ((*it)->get_slot ()); - zombies.erase (it); -} - void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_) { slots [slot_]->send (command_); @@ -307,14 +291,15 @@ void zmq::ctx_t::dezombify () { // Try to dezombify each zombie in the list. Note that caller is // responsible for calling this method in the slot_sync critical section. - zombies_t::iterator it = zombies.begin (); - while (it != zombies.end ()) { - zombies_t::iterator old = it; - ++it; - - // dezombify_socket can be called here that will invalidate - // the iterator. That's why we've got the next zombie beforehand. - (*old)->dezombify (); + for (zombies_t::iterator it = zombies.begin (); it != zombies.end ();) { + uint32_t slot = (*it)->get_slot (); + if ((*it)->dezombify ()) { + zombies.erase (it); + empty_slots.push_back (slot); + slots [slot] = NULL; + } + else + it++; } } diff --git a/src/ctx.hpp b/src/ctx.hpp index 1b51151..2394c70 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -20,9 +20,7 @@ #ifndef __ZMQ_CTX_HPP_INCLUDED__ #define __ZMQ_CTX_HPP_INCLUDED__ -#include <set> #include <map> -#include <list> #include <vector> #include <string> @@ -61,9 +59,6 @@ namespace zmq // Make socket a zombie. void zombify_socket (socket_base_t *socket_); - // Kill the zombie socket. - void dezombify_socket (socket_base_t *socket_); - // Send command to the destination slot. void send_command (uint32_t slot_, const command_t &command_); @@ -89,7 +84,7 @@ namespace zmq // List of sockets that were already closed but not yet deallocated. // These sockets still have some pipes and I/O objects attached. - typedef std::list <socket_base_t*> zombies_t; + typedef std::vector <socket_base_t*> zombies_t; zombies_t zombies; // List of unused slots. diff --git a/src/object.cpp b/src/object.cpp index 3296fcf..a8294b0 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -147,11 +147,6 @@ void zmq::object_t::zombify_socket (socket_base_t *socket_) ctx->zombify_socket (socket_); } -void zmq::object_t::dezombify_socket (socket_base_t *socket_) -{ - ctx->dezombify_socket (socket_); -} - void zmq::object_t::send_stop () { // 'stop' command goes always from administrative thread to diff --git a/src/object.hpp b/src/object.hpp index f146b25..e083ce3 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -55,9 +55,6 @@ namespace zmq // the context. void zombify_socket (class socket_base_t *socket_); - // Dezombify particular socket, i.e. destroy it. - void dezombify_socket (class socket_base_t *socket_); - // Derived object can use these functions to send commands // to other objects. void send_stop (); diff --git a/src/own.cpp b/src/own.cpp index d90e9c4..f8252ab 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -192,7 +192,12 @@ void zmq::own_t::check_term_acks () send_term_ack (owner); // Deallocate the resources. - delete this; + process_destroy (); } } +void zmq::own_t::process_destroy () +{ + delete this; +} + diff --git a/src/own.hpp b/src/own.hpp index dc14fcc..b65177e 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -85,6 +85,10 @@ namespace zmq void register_term_acks (int count_); void unregister_term_ack (); + // A place to hook in when phyicallal destruction of the object + // is to be delayed. + virtual void process_destroy (); + private: // Set owner of the object @@ -94,7 +98,6 @@ namespace zmq void process_own (own_t *object_); void process_term_req (own_t *object_); void process_term_ack (); - void process_seqnum (); // Check whether all the peding term acks were delivered. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 89b8a29..060480f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -110,6 +110,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : own_t (parent_, slot_), zombie (false), + destroyed (false), last_processing_time (0), ticks (0), rcvmore (false) @@ -118,15 +119,12 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : zmq::socket_base_t::~socket_base_t () { - zmq_assert (zombie); + zmq_assert (zombie && destroyed); // Check whether there are no session leaks. sessions_sync.lock (); zmq_assert (sessions.empty ()); sessions_sync.unlock (); - - // Mark the socket slot as empty. - dezombify_socket (this); } zmq::signaler_t *zmq::socket_base_t::get_signaler () @@ -604,13 +602,21 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) return session; } -void zmq::socket_base_t::dezombify () +bool zmq::socket_base_t::dezombify () { zmq_assert (zombie); // Process any commands from other threads/sockets that may be available // at the moment. Ultimately, socket will be destroyed. process_commands (false, false); + + // If the object was already marked as destroyed, finish the deallocation. + if (destroyed) { + own_t::process_destroy (); + return true; + } + + return false; } void zmq::socket_base_t::process_commands (bool block_, bool throttle_) @@ -705,6 +711,11 @@ void zmq::socket_base_t::process_term () own_t::process_term (); } +void zmq::socket_base_t::process_destroy () +{ + destroyed = true; +} + int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 785967e..1d8c4ff 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -85,8 +85,8 @@ namespace zmq void terminated (class writer_t *pipe_); // This function should be called only on zombie sockets. It tries - // to deallocate the zombie. - void dezombify (); + // to deallocate the zombie. Returns true is object is destroyed. + bool dezombify (); protected: @@ -120,6 +120,9 @@ namespace zmq // by overloading it. void process_term (); + // Delay actual destruction of the socket. + void process_destroy (); + private: // TODO: Check whether we still need this flag... @@ -128,6 +131,11 @@ namespace zmq // attached to the socket. bool zombie; + // If true, object should have been already destroyed. However, + // destruction is delayed while we unwind the stack to the point + // where it doesn't intersect the object being destroyed. + bool destroyed; + // Check whether transport protocol, as specified in connect or // bind, is available and compatible with the socket type. int check_protocol (const std::string &protocol_); |