diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ctx.cpp | 46 | ||||
| -rw-r--r-- | src/ctx.hpp | 12 | ||||
| -rw-r--r-- | src/object.cpp | 5 | ||||
| -rw-r--r-- | src/object.hpp | 3 | ||||
| -rw-r--r-- | src/socket_base.cpp | 10 | ||||
| -rw-r--r-- | src/socket_base.hpp | 4 | ||||
| -rw-r--r-- | src/zmq.cpp | 2 | 
7 files changed, 59 insertions, 23 deletions
| diff --git a/src/ctx.cpp b/src/ctx.cpp index d096b91..a958833 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -97,7 +97,7 @@ zmq::ctx_t::~ctx_t ()  #endif  } -int zmq::ctx_t::term () +int zmq::ctx_t::terminate ()  {      //  First send stop command to sockets so that any      //  blocking calls are interrupted. @@ -115,12 +115,15 @@ int zmq::ctx_t::term ()      if (no_sockets_notify)          no_sockets_sync.wait (); -    //  At this point there's only one application thread (this one) remaining. -    //  We don't even have to synchronise access to data. +    //  At this point there should be no active sockets. What we have is a set +    //  of zombies waiting to be dezombified.      zmq_assert (sockets.empty ()); -//  TODO: We are accessing the list of zombies in unsynchronised manner here! -    //  Get rid of remaining zombie sockets. +    //  Get rid of remaining zombie sockets. Note that the lock won't block +    //  anyone here. There's noone else having open sockets anyway. The only +    //  purpose of the lock is to double-check all the CPU caches have been +    //  synchronised. +    slot_sync.lock ();      while (!zombies.empty ()) {          dezombify (); @@ -134,6 +137,7 @@ int zmq::ctx_t::term ()          usleep (1000);  #endif      } +    slot_sync.unlock ();      //  Deallocate the resources.      delete this; @@ -197,6 +201,22 @@ void zmq::ctx_t::zombify_socket (socket_base_t *socket_)      slot_sync.unlock ();  } +void zmq::ctx_t::dezombify_socket (socket_base_t *socket_) +{ +    //  We assume that this function is called only within dezombification +    //  process, which in turn is running within a slot_sync critical section. +    //  Therefore, we need no locking here. + +    //  TODO: Can we do this better than O(n)? +    zombies_t::iterator it = std::find (zombies.begin (), zombies.end (), +        socket_); +    zmq_assert (it != zombies.end ()); + +    //  Move from the slot from 'zombie' to 'empty' state. +    empty_slots.push_back ((*it)->get_slot ()); +    zombies.erase (it); +} +  void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)  {      slots [slot_]->send (command_); @@ -287,12 +307,14 @@ void zmq::ctx_t::dezombify ()  {      //  Try to dezombify each zombie in the list. Note that caller is      //  responsible for calling this method in the slot_sync critical section. -    for (zombies_t::size_type i = 0; i != zombies.size ();) -        if (zombies [i]->dezombify ()) { -            empty_slots.push_back (zombies [i]->get_slot ()); -            zombies.erase (zombies [i]); -        } -        else -            i++; +    zombies_t::iterator it = zombies.begin (); +    while (it != zombies.end ()) { +        zombies_t::iterator old = it; +        ++it; + +        //  dezombify_socket can be called here that will invalidate +        //  the iterator. That's why we've got the next zombie beforehand. +        (*old)->dezombify (); +    }  } diff --git a/src/ctx.hpp b/src/ctx.hpp index c44cca6..1b51151 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -20,9 +20,10 @@  #ifndef __ZMQ_CTX_HPP_INCLUDED__  #define __ZMQ_CTX_HPP_INCLUDED__ -#include <vector>  #include <set>  #include <map> +#include <list> +#include <vector>  #include <string>  #include "signaler.hpp" @@ -52,7 +53,7 @@ namespace zmq          //  no more sockets open it'll cause all the infrastructure to be shut          //  down. If there are open sockets still, the deallocation happens          //  after the last one is closed. -        int term (); +        int terminate ();          //  Create a socket.          class socket_base_t *create_socket (int type_); @@ -60,6 +61,9 @@ namespace zmq          //  Make socket a zombie.          void zombify_socket (socket_base_t *socket_); +        //  Kill the zombie socket. +        void dezombify_socket (socket_base_t *socket_); +          //  Send command to the destination slot.          void send_command (uint32_t slot_, const command_t &command_); @@ -83,9 +87,9 @@ namespace zmq          typedef yarray_t <socket_base_t> sockets_t;          sockets_t sockets; -        //  Array of sockets that were already closed but not yet deallocated. +        //  List of sockets that were already closed but not yet deallocated.          //  These sockets still have some pipes and I/O objects attached. -        typedef yarray_t <socket_base_t> zombies_t; +        typedef std::list <socket_base_t*> zombies_t;          zombies_t zombies;          //  List of unused slots. diff --git a/src/object.cpp b/src/object.cpp index a8294b0..3296fcf 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -147,6 +147,11 @@ void zmq::object_t::zombify_socket (socket_base_t *socket_)      ctx->zombify_socket (socket_);  } +void zmq::object_t::dezombify_socket (socket_base_t *socket_) +{ +    ctx->dezombify_socket (socket_); +} +  void zmq::object_t::send_stop ()  {      //  'stop' command goes always from administrative thread to diff --git a/src/object.hpp b/src/object.hpp index e083ce3..f146b25 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -55,6 +55,9 @@ namespace zmq          //  the context.          void zombify_socket (class socket_base_t *socket_); +        //  Dezombify particular socket, i.e. destroy it. +        void dezombify_socket (class socket_base_t *socket_); +          //  Derived object can use these functions to send commands          //  to other objects.          void send_stop (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 903e781..89b8a29 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -118,10 +118,15 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :  zmq::socket_base_t::~socket_base_t ()  { +    zmq_assert (zombie); +      //  Check whether there are no session leaks.      sessions_sync.lock ();      zmq_assert (sessions.empty ());      sessions_sync.unlock (); + +    //  Mark the socket slot as empty. +    dezombify_socket (this);  }  zmq::signaler_t *zmq::socket_base_t::get_signaler () @@ -599,16 +604,13 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)      return session;      } -bool zmq::socket_base_t::dezombify () +void zmq::socket_base_t::dezombify ()  {      zmq_assert (zombie);      //  Process any commands from other threads/sockets that may be available      //  at the moment. Ultimately, socket will be destroyed.      process_commands (false, false); - -//  TODO: ??? -    return true;  }  void zmq::socket_base_t::process_commands (bool block_, bool throttle_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index f76dc4c..785967e 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -85,8 +85,8 @@ namespace zmq          void terminated (class writer_t *pipe_);          //  This function should be called only on zombie sockets. It tries -        //  to deallocate the zombie. Returns true if zombie is finally dead. -        bool dezombify (); +        //  to deallocate the zombie. +        void dezombify ();      protected: diff --git a/src/zmq.cpp b/src/zmq.cpp index 1a74f86..e9bfc53 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -269,7 +269,7 @@ void *zmq_init (int io_threads_)  int zmq_term (void *ctx_)  { -    int rc = ((zmq::ctx_t*) ctx_)->term (); +    int rc = ((zmq::ctx_t*) ctx_)->terminate ();      int en = errno;      if (!ctx_) { | 
