summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-12 08:16:18 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commit936dbf956b0f1471a96fc06bcba67765257dbc4a (patch)
treeb23704ec1d4d6f8c6c94e55919fcfcc1d0f26d6a /src
parent76bd6e73c335dbebd8bd30565f83a810058f2cc8 (diff)
dezombification procedure fixed
Diffstat (limited to 'src')
-rw-r--r--src/ctx.cpp46
-rw-r--r--src/ctx.hpp12
-rw-r--r--src/object.cpp5
-rw-r--r--src/object.hpp3
-rw-r--r--src/socket_base.cpp10
-rw-r--r--src/socket_base.hpp4
-rw-r--r--src/zmq.cpp2
7 files changed, 59 insertions, 23 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index d096b91..a958833 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -97,7 +97,7 @@ zmq::ctx_t::~ctx_t ()
#endif
}
-int zmq::ctx_t::term ()
+int zmq::ctx_t::terminate ()
{
// First send stop command to sockets so that any
// blocking calls are interrupted.
@@ -115,12 +115,15 @@ int zmq::ctx_t::term ()
if (no_sockets_notify)
no_sockets_sync.wait ();
- // At this point there's only one application thread (this one) remaining.
- // We don't even have to synchronise access to data.
+ // At this point there should be no active sockets. What we have is a set
+ // of zombies waiting to be dezombified.
zmq_assert (sockets.empty ());
-// TODO: We are accessing the list of zombies in unsynchronised manner here!
- // Get rid of remaining zombie sockets.
+ // Get rid of remaining zombie sockets. Note that the lock won't block
+ // anyone here. There's noone else having open sockets anyway. The only
+ // purpose of the lock is to double-check all the CPU caches have been
+ // synchronised.
+ slot_sync.lock ();
while (!zombies.empty ()) {
dezombify ();
@@ -134,6 +137,7 @@ int zmq::ctx_t::term ()
usleep (1000);
#endif
}
+ slot_sync.unlock ();
// Deallocate the resources.
delete this;
@@ -197,6 +201,22 @@ void zmq::ctx_t::zombify_socket (socket_base_t *socket_)
slot_sync.unlock ();
}
+void zmq::ctx_t::dezombify_socket (socket_base_t *socket_)
+{
+ // We assume that this function is called only within dezombification
+ // process, which in turn is running within a slot_sync critical section.
+ // Therefore, we need no locking here.
+
+ // TODO: Can we do this better than O(n)?
+ zombies_t::iterator it = std::find (zombies.begin (), zombies.end (),
+ socket_);
+ zmq_assert (it != zombies.end ());
+
+ // Move from the slot from 'zombie' to 'empty' state.
+ empty_slots.push_back ((*it)->get_slot ());
+ zombies.erase (it);
+}
+
void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
{
slots [slot_]->send (command_);
@@ -287,12 +307,14 @@ void zmq::ctx_t::dezombify ()
{
// Try to dezombify each zombie in the list. Note that caller is
// responsible for calling this method in the slot_sync critical section.
- for (zombies_t::size_type i = 0; i != zombies.size ();)
- if (zombies [i]->dezombify ()) {
- empty_slots.push_back (zombies [i]->get_slot ());
- zombies.erase (zombies [i]);
- }
- else
- i++;
+ zombies_t::iterator it = zombies.begin ();
+ while (it != zombies.end ()) {
+ zombies_t::iterator old = it;
+ ++it;
+
+ // dezombify_socket can be called here that will invalidate
+ // the iterator. That's why we've got the next zombie beforehand.
+ (*old)->dezombify ();
+ }
}
diff --git a/src/ctx.hpp b/src/ctx.hpp
index c44cca6..1b51151 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -20,9 +20,10 @@
#ifndef __ZMQ_CTX_HPP_INCLUDED__
#define __ZMQ_CTX_HPP_INCLUDED__
-#include <vector>
#include <set>
#include <map>
+#include <list>
+#include <vector>
#include <string>
#include "signaler.hpp"
@@ -52,7 +53,7 @@ namespace zmq
// no more sockets open it'll cause all the infrastructure to be shut
// down. If there are open sockets still, the deallocation happens
// after the last one is closed.
- int term ();
+ int terminate ();
// Create a socket.
class socket_base_t *create_socket (int type_);
@@ -60,6 +61,9 @@ namespace zmq
// Make socket a zombie.
void zombify_socket (socket_base_t *socket_);
+ // Kill the zombie socket.
+ void dezombify_socket (socket_base_t *socket_);
+
// Send command to the destination slot.
void send_command (uint32_t slot_, const command_t &command_);
@@ -83,9 +87,9 @@ namespace zmq
typedef yarray_t <socket_base_t> sockets_t;
sockets_t sockets;
- // Array of sockets that were already closed but not yet deallocated.
+ // List of sockets that were already closed but not yet deallocated.
// These sockets still have some pipes and I/O objects attached.
- typedef yarray_t <socket_base_t> zombies_t;
+ typedef std::list <socket_base_t*> zombies_t;
zombies_t zombies;
// List of unused slots.
diff --git a/src/object.cpp b/src/object.cpp
index a8294b0..3296fcf 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -147,6 +147,11 @@ void zmq::object_t::zombify_socket (socket_base_t *socket_)
ctx->zombify_socket (socket_);
}
+void zmq::object_t::dezombify_socket (socket_base_t *socket_)
+{
+ ctx->dezombify_socket (socket_);
+}
+
void zmq::object_t::send_stop ()
{
// 'stop' command goes always from administrative thread to
diff --git a/src/object.hpp b/src/object.hpp
index e083ce3..f146b25 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -55,6 +55,9 @@ namespace zmq
// the context.
void zombify_socket (class socket_base_t *socket_);
+ // Dezombify particular socket, i.e. destroy it.
+ void dezombify_socket (class socket_base_t *socket_);
+
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 903e781..89b8a29 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -118,10 +118,15 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
zmq::socket_base_t::~socket_base_t ()
{
+ zmq_assert (zombie);
+
// Check whether there are no session leaks.
sessions_sync.lock ();
zmq_assert (sessions.empty ());
sessions_sync.unlock ();
+
+ // Mark the socket slot as empty.
+ dezombify_socket (this);
}
zmq::signaler_t *zmq::socket_base_t::get_signaler ()
@@ -599,16 +604,13 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
return session;
}
-bool zmq::socket_base_t::dezombify ()
+void zmq::socket_base_t::dezombify ()
{
zmq_assert (zombie);
// Process any commands from other threads/sockets that may be available
// at the moment. Ultimately, socket will be destroyed.
process_commands (false, false);
-
-// TODO: ???
- return true;
}
void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index f76dc4c..785967e 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -85,8 +85,8 @@ namespace zmq
void terminated (class writer_t *pipe_);
// This function should be called only on zombie sockets. It tries
- // to deallocate the zombie. Returns true if zombie is finally dead.
- bool dezombify ();
+ // to deallocate the zombie.
+ void dezombify ();
protected:
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 1a74f86..e9bfc53 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -269,7 +269,7 @@ void *zmq_init (int io_threads_)
int zmq_term (void *ctx_)
{
- int rc = ((zmq::ctx_t*) ctx_)->term ();
+ int rc = ((zmq::ctx_t*) ctx_)->terminate ();
int en = errno;
if (!ctx_) {