summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-12 15:03:51 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commit45f83d78a56f4b3a812c87fec03a75558445b2ab (patch)
tree1adba1798c914baf65929d89ed9725dd68672bd6
parent936dbf956b0f1471a96fc06bcba67765257dbc4a (diff)
one more dezombification bug fixed
-rw-r--r--src/ctx.cpp33
-rw-r--r--src/ctx.hpp7
-rw-r--r--src/object.cpp5
-rw-r--r--src/object.hpp3
-rw-r--r--src/own.cpp7
-rw-r--r--src/own.hpp5
-rw-r--r--src/socket_base.cpp21
-rw-r--r--src/socket_base.hpp12
8 files changed, 46 insertions, 47 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index a958833..79145eb 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -201,22 +201,6 @@ void zmq::ctx_t::zombify_socket (socket_base_t *socket_)
slot_sync.unlock ();
}
-void zmq::ctx_t::dezombify_socket (socket_base_t *socket_)
-{
- // We assume that this function is called only within dezombification
- // process, which in turn is running within a slot_sync critical section.
- // Therefore, we need no locking here.
-
- // TODO: Can we do this better than O(n)?
- zombies_t::iterator it = std::find (zombies.begin (), zombies.end (),
- socket_);
- zmq_assert (it != zombies.end ());
-
- // Move from the slot from 'zombie' to 'empty' state.
- empty_slots.push_back ((*it)->get_slot ());
- zombies.erase (it);
-}
-
void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
{
slots [slot_]->send (command_);
@@ -307,14 +291,15 @@ void zmq::ctx_t::dezombify ()
{
// Try to dezombify each zombie in the list. Note that caller is
// responsible for calling this method in the slot_sync critical section.
- zombies_t::iterator it = zombies.begin ();
- while (it != zombies.end ()) {
- zombies_t::iterator old = it;
- ++it;
-
- // dezombify_socket can be called here that will invalidate
- // the iterator. That's why we've got the next zombie beforehand.
- (*old)->dezombify ();
+ for (zombies_t::iterator it = zombies.begin (); it != zombies.end ();) {
+ uint32_t slot = (*it)->get_slot ();
+ if ((*it)->dezombify ()) {
+ zombies.erase (it);
+ empty_slots.push_back (slot);
+ slots [slot] = NULL;
+ }
+ else
+ it++;
}
}
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 1b51151..2394c70 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -20,9 +20,7 @@
#ifndef __ZMQ_CTX_HPP_INCLUDED__
#define __ZMQ_CTX_HPP_INCLUDED__
-#include <set>
#include <map>
-#include <list>
#include <vector>
#include <string>
@@ -61,9 +59,6 @@ namespace zmq
// Make socket a zombie.
void zombify_socket (socket_base_t *socket_);
- // Kill the zombie socket.
- void dezombify_socket (socket_base_t *socket_);
-
// Send command to the destination slot.
void send_command (uint32_t slot_, const command_t &command_);
@@ -89,7 +84,7 @@ namespace zmq
// List of sockets that were already closed but not yet deallocated.
// These sockets still have some pipes and I/O objects attached.
- typedef std::list <socket_base_t*> zombies_t;
+ typedef std::vector <socket_base_t*> zombies_t;
zombies_t zombies;
// List of unused slots.
diff --git a/src/object.cpp b/src/object.cpp
index 3296fcf..a8294b0 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -147,11 +147,6 @@ void zmq::object_t::zombify_socket (socket_base_t *socket_)
ctx->zombify_socket (socket_);
}
-void zmq::object_t::dezombify_socket (socket_base_t *socket_)
-{
- ctx->dezombify_socket (socket_);
-}
-
void zmq::object_t::send_stop ()
{
// 'stop' command goes always from administrative thread to
diff --git a/src/object.hpp b/src/object.hpp
index f146b25..e083ce3 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -55,9 +55,6 @@ namespace zmq
// the context.
void zombify_socket (class socket_base_t *socket_);
- // Dezombify particular socket, i.e. destroy it.
- void dezombify_socket (class socket_base_t *socket_);
-
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
diff --git a/src/own.cpp b/src/own.cpp
index d90e9c4..f8252ab 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -192,7 +192,12 @@ void zmq::own_t::check_term_acks ()
send_term_ack (owner);
// Deallocate the resources.
- delete this;
+ process_destroy ();
}
}
+void zmq::own_t::process_destroy ()
+{
+ delete this;
+}
+
diff --git a/src/own.hpp b/src/own.hpp
index dc14fcc..b65177e 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -85,6 +85,10 @@ namespace zmq
void register_term_acks (int count_);
void unregister_term_ack ();
+ // A place to hook in when phyicallal destruction of the object
+ // is to be delayed.
+ virtual void process_destroy ();
+
private:
// Set owner of the object
@@ -94,7 +98,6 @@ namespace zmq
void process_own (own_t *object_);
void process_term_req (own_t *object_);
void process_term_ack ();
-
void process_seqnum ();
// Check whether all the peding term acks were delivered.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 89b8a29..060480f 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -110,6 +110,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
own_t (parent_, slot_),
zombie (false),
+ destroyed (false),
last_processing_time (0),
ticks (0),
rcvmore (false)
@@ -118,15 +119,12 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
zmq::socket_base_t::~socket_base_t ()
{
- zmq_assert (zombie);
+ zmq_assert (zombie && destroyed);
// Check whether there are no session leaks.
sessions_sync.lock ();
zmq_assert (sessions.empty ());
sessions_sync.unlock ();
-
- // Mark the socket slot as empty.
- dezombify_socket (this);
}
zmq::signaler_t *zmq::socket_base_t::get_signaler ()
@@ -604,13 +602,21 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
return session;
}
-void zmq::socket_base_t::dezombify ()
+bool zmq::socket_base_t::dezombify ()
{
zmq_assert (zombie);
// Process any commands from other threads/sockets that may be available
// at the moment. Ultimately, socket will be destroyed.
process_commands (false, false);
+
+ // If the object was already marked as destroyed, finish the deallocation.
+ if (destroyed) {
+ own_t::process_destroy ();
+ return true;
+ }
+
+ return false;
}
void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
@@ -705,6 +711,11 @@ void zmq::socket_base_t::process_term ()
own_t::process_term ();
}
+void zmq::socket_base_t::process_destroy ()
+{
+ destroyed = true;
+}
+
int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 785967e..1d8c4ff 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -85,8 +85,8 @@ namespace zmq
void terminated (class writer_t *pipe_);
// This function should be called only on zombie sockets. It tries
- // to deallocate the zombie.
- void dezombify ();
+ // to deallocate the zombie. Returns true is object is destroyed.
+ bool dezombify ();
protected:
@@ -120,6 +120,9 @@ namespace zmq
// by overloading it.
void process_term ();
+ // Delay actual destruction of the socket.
+ void process_destroy ();
+
private:
// TODO: Check whether we still need this flag...
@@ -128,6 +131,11 @@ namespace zmq
// attached to the socket.
bool zombie;
+ // If true, object should have been already destroyed. However,
+ // destruction is delayed while we unwind the stack to the point
+ // where it doesn't intersect the object being destroyed.
+ bool destroyed;
+
// Check whether transport protocol, as specified in connect or
// bind, is available and compatible with the socket type.
int check_protocol (const std::string &protocol_);