diff options
| -rw-r--r-- | src/command.hpp | 5 | ||||
| -rw-r--r-- | src/object.cpp | 20 | ||||
| -rw-r--r-- | src/object.hpp | 2 | ||||
| -rw-r--r-- | src/reaper.cpp | 64 | ||||
| -rw-r--r-- | src/reaper.hpp | 13 | ||||
| -rw-r--r-- | src/socket_base.cpp | 53 | ||||
| -rw-r--r-- | src/socket_base.hpp | 21 | 
7 files changed, 106 insertions, 72 deletions
| diff --git a/src/command.hpp b/src/command.hpp index ec0850d..d03de2f 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -47,6 +47,7 @@ namespace zmq              term,              term_ack,              reap, +            reaped,              done          } type; @@ -125,6 +126,10 @@ namespace zmq                  class socket_base_t *socket;              } reap; +            //  Closed socket notifies the reaper that it's already deallocated. +            struct { +            } reaped; +              //  Sent by reaper thread to the term thread when all the sockets              //  are successfully deallocated.              struct { diff --git a/src/object.cpp b/src/object.cpp index 823d4b6..9ec73f7 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -118,6 +118,10 @@ void zmq::object_t::process_command (command_t &cmd_)          process_reap (cmd_.args.reap.socket);          break; +    case command_t::reaped: +        process_reaped (); +        break; +      default:          zmq_assert (false);      } @@ -352,6 +356,17 @@ void zmq::object_t::send_reap (class socket_base_t *socket_)      send_command (cmd);  } +void zmq::object_t::send_reaped () +{ +    command_t cmd; +#if defined ZMQ_MAKE_VALGRIND_HAPPY +    memset (&cmd, 0, sizeof (cmd)); +#endif +    cmd.destination = ctx->get_reaper (); +    cmd.type = command_t::reaped; +    send_command (cmd); +} +  void zmq::object_t::send_done ()  {      command_t cmd; @@ -430,6 +445,11 @@ void zmq::object_t::process_reap (class socket_base_t *socket_)      zmq_assert (false);  } +void zmq::object_t::process_reaped () +{ +    zmq_assert (false); +} +  void zmq::object_t::process_seqnum ()  {      zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index cee82c8..748a339 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -80,6 +80,7 @@ namespace zmq          void send_term (class own_t *destination_, int linger_);          void send_term_ack (class own_t *destination_);          void send_reap (class socket_base_t *socket_); +        void send_reaped ();          void send_done ();          //  These handlers can be overloaded by the derived objects. They are @@ -99,6 +100,7 @@ namespace zmq          virtual void process_term (int linger_);          virtual void process_term_ack ();          virtual void process_reap (class socket_base_t *socket_); +        virtual void process_reaped ();          //  Special handler called after a command that requires a seqnum          //  was processed. The implementation should catch up with its counter diff --git a/src/reaper.cpp b/src/reaper.cpp index f114f56..b1e2796 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -23,8 +23,8 @@  zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :      object_t (ctx_, tid_), -    terminating (false), -    has_timer (false) +    sockets (0), +    terminating (false)  {      poller = new (std::nothrow) poller_t;      zmq_assert (poller); @@ -74,55 +74,20 @@ void zmq::reaper_t::in_event ()  void zmq::reaper_t::out_event ()  { -    //  We are never polling for POLLOUT here. This function is never called.      zmq_assert (false);  }  void zmq::reaper_t::timer_event (int id_)  { -    zmq_assert (has_timer); -    has_timer = false; -    reap (); -} - -void zmq::reaper_t::reap () -{ -    //  Try to reap each socket in the list. -    for (sockets_t::iterator it = sockets.begin (); it != sockets.end ();) { -        if ((*it)->reap ()) { - -            //  MSVC version of STL requires this to be done a spacial way... -#if defined _MSC_VER -            it = sockets.erase (it); -#else -            sockets.erase (it); -#endif -        } -        else -            ++it; -    } - -    //  If there are still sockets to reap, wait a while, then try again. -    if (!sockets.empty () && !has_timer) { -        poller->add_timer (1 , this, 0); -        has_timer = true; -        return; -    } - -    //  No more sockets and the context is already shutting down. -    if (terminating) { -        send_done (); -        poller->rm_fd (mailbox_handle); -        poller->stop (); -        return; -    } +    zmq_assert (false);  }  void zmq::reaper_t::process_stop ()  {      terminating = true; -    if (sockets.empty ()) { +    //  If there are no sockets beig reaped finish immediately. +    if (!sockets) {          send_done ();          poller->rm_fd (mailbox_handle);          poller->stop (); @@ -133,7 +98,22 @@ void zmq::reaper_t::process_reap (socket_base_t *socket_)  {      //  Start termination of associated I/O object hierarchy.      socket_->terminate (); -    sockets.push_back (socket_); -    reap (); + +    //  Add the socket to the poller. +    socket_->start_reaping (poller); + +    ++sockets;  } +void zmq::reaper_t::process_reaped () +{ +    --sockets; + +    //  If reaped was already asked to terminate and there are no more sockets, +    //  finish immediately. +    if (!sockets && terminating) { +        send_done (); +        poller->rm_fd (mailbox_handle); +        poller->stop (); +    } +} diff --git a/src/reaper.hpp b/src/reaper.hpp index 8598cd9..6536b74 100644 --- a/src/reaper.hpp +++ b/src/reaper.hpp @@ -47,15 +47,10 @@ namespace zmq      private: -        void reap (); -          //  Command handlers.          void process_stop ();          void process_reap (class socket_base_t *socket_); - -        //  List of all sockets being terminated. -        typedef std::vector <class socket_base_t*> sockets_t; -        sockets_t sockets; +        void process_reaped ();          //  Reaper thread accesses incoming commands via this mailbox.          mailbox_t mailbox; @@ -66,12 +61,12 @@ namespace zmq          //  I/O multiplexing is performed using a poller object.          poller_t *poller; +        //  Number of sockets being reaped at the moment. +        int sockets; +          //  If true, we were already asked to terminate.          bool terminating; -        //  If true, timer till next reaping is running. -        bool has_timer; -          reaper_t (const reaper_t&);          const reaper_t &operator = (const reaper_t&);      }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 0643d4d..f15ab9d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -624,24 +624,11 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)      return session;      } -bool zmq::socket_base_t::reap () +void zmq::socket_base_t::start_reaping (poller_t *poller_)  { -    //  Process any commands from other threads/sockets that may be available -    //  at the moment. Ultimately, socket will be destroyed. -    process_commands (false, false); - -    //  If the object was already marked as destroyed, finish the deallocation. -    if (destroyed) { - -        //  Remove the socket from the context. -        destroy_socket (this); - -        //  Deallocate. -        own_t::process_destroy (); -        return true; -    } - -    return false; +    poller = poller_; +    handle = poller->add_fd (mailbox.get_fd (), this); +    poller->set_pollin (handle);  }  int zmq::socket_base_t::process_commands (bool block_, bool throttle_) @@ -762,3 +749,35 @@ int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_)      return -1;  } +void zmq::socket_base_t::in_event () +{ +    //  Process any commands from other threads/sockets that may be available +    //  at the moment. Ultimately, socket will be destroyed. +    process_commands (false, false); + +    //  If the object was already marked as destroyed, finish the deallocation. +    if (destroyed) { + +        //  Remove the socket from the reaper's poller. +        poller->rm_fd (handle); + +        //  Remove the socket from the context. +        destroy_socket (this); + +        //  Notify the reaper about the fact. +        send_reaped (); + +        //  Deallocate. +        own_t::process_destroy (); +    } +} + +void zmq::socket_base_t::out_event () +{ +    zmq_assert (false); +} + +void zmq::socket_base_t::timer_event (int id_) +{ +    zmq_assert (false); +} diff --git a/src/socket_base.hpp b/src/socket_base.hpp index a74b7d0..cea5bc8 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -29,7 +29,9 @@  #include "array.hpp"  #include "mutex.hpp"  #include "stdint.hpp" +#include "poller.hpp"  #include "atomic_counter.hpp" +#include "i_poll_events.hpp"  #include "mailbox.hpp"  #include "stdint.hpp"  #include "blob.hpp" @@ -40,7 +42,8 @@ namespace zmq      class socket_base_t :          public own_t, -        public array_item_t +        public array_item_t, +        public i_poll_events      {          friend class reaper_t; @@ -84,9 +87,15 @@ namespace zmq          void activated (class writer_t *pipe_);          void terminated (class writer_t *pipe_); -        //  This function should be called only on sockets that are already -        //  closed -- from the reaper thread. It tries to finalise the socket. -        bool reap (); +        //  Using this function reaper thread ask the socket to regiter with +        //  its poller. +        void start_reaping (poller_t *poller_); + +        //  i_poll_events implementation. This interface is used when socket +        //  is handled by the poller in the reaper thread. +        void in_event (); +        void out_event (); +        void timer_event (int id_);      protected: @@ -157,6 +166,10 @@ namespace zmq          //  Socket's mailbox object.          mailbox_t mailbox; +        //  Reaper's poller and handle of this socket within it. +        poller_t *poller; +        poller_t::handle_t handle; +          //  Timestamp of when commands were processed the last time.          uint64_t last_tsc; | 
