diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/command.hpp | 5 | ||||
-rw-r--r-- | src/object.cpp | 20 | ||||
-rw-r--r-- | src/object.hpp | 2 | ||||
-rw-r--r-- | src/reaper.cpp | 64 | ||||
-rw-r--r-- | src/reaper.hpp | 13 | ||||
-rw-r--r-- | src/socket_base.cpp | 53 | ||||
-rw-r--r-- | src/socket_base.hpp | 21 |
7 files changed, 106 insertions, 72 deletions
diff --git a/src/command.hpp b/src/command.hpp index ec0850d..d03de2f 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -47,6 +47,7 @@ namespace zmq term, term_ack, reap, + reaped, done } type; @@ -125,6 +126,10 @@ namespace zmq class socket_base_t *socket; } reap; + // Closed socket notifies the reaper that it's already deallocated. + struct { + } reaped; + // Sent by reaper thread to the term thread when all the sockets // are successfully deallocated. struct { diff --git a/src/object.cpp b/src/object.cpp index 823d4b6..9ec73f7 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -118,6 +118,10 @@ void zmq::object_t::process_command (command_t &cmd_) process_reap (cmd_.args.reap.socket); break; + case command_t::reaped: + process_reaped (); + break; + default: zmq_assert (false); } @@ -352,6 +356,17 @@ void zmq::object_t::send_reap (class socket_base_t *socket_) send_command (cmd); } +void zmq::object_t::send_reaped () +{ + command_t cmd; +#if defined ZMQ_MAKE_VALGRIND_HAPPY + memset (&cmd, 0, sizeof (cmd)); +#endif + cmd.destination = ctx->get_reaper (); + cmd.type = command_t::reaped; + send_command (cmd); +} + void zmq::object_t::send_done () { command_t cmd; @@ -430,6 +445,11 @@ void zmq::object_t::process_reap (class socket_base_t *socket_) zmq_assert (false); } +void zmq::object_t::process_reaped () +{ + zmq_assert (false); +} + void zmq::object_t::process_seqnum () { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index cee82c8..748a339 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -80,6 +80,7 @@ namespace zmq void send_term (class own_t *destination_, int linger_); void send_term_ack (class own_t *destination_); void send_reap (class socket_base_t *socket_); + void send_reaped (); void send_done (); // These handlers can be overloaded by the derived objects. They are @@ -99,6 +100,7 @@ namespace zmq virtual void process_term (int linger_); virtual void process_term_ack (); virtual void process_reap (class socket_base_t *socket_); + virtual void process_reaped (); // Special handler called after a command that requires a seqnum // was processed. The implementation should catch up with its counter diff --git a/src/reaper.cpp b/src/reaper.cpp index f114f56..b1e2796 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -23,8 +23,8 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : object_t (ctx_, tid_), - terminating (false), - has_timer (false) + sockets (0), + terminating (false) { poller = new (std::nothrow) poller_t; zmq_assert (poller); @@ -74,55 +74,20 @@ void zmq::reaper_t::in_event () void zmq::reaper_t::out_event () { - // We are never polling for POLLOUT here. This function is never called. zmq_assert (false); } void zmq::reaper_t::timer_event (int id_) { - zmq_assert (has_timer); - has_timer = false; - reap (); -} - -void zmq::reaper_t::reap () -{ - // Try to reap each socket in the list. - for (sockets_t::iterator it = sockets.begin (); it != sockets.end ();) { - if ((*it)->reap ()) { - - // MSVC version of STL requires this to be done a spacial way... -#if defined _MSC_VER - it = sockets.erase (it); -#else - sockets.erase (it); -#endif - } - else - ++it; - } - - // If there are still sockets to reap, wait a while, then try again. - if (!sockets.empty () && !has_timer) { - poller->add_timer (1 , this, 0); - has_timer = true; - return; - } - - // No more sockets and the context is already shutting down. - if (terminating) { - send_done (); - poller->rm_fd (mailbox_handle); - poller->stop (); - return; - } + zmq_assert (false); } void zmq::reaper_t::process_stop () { terminating = true; - if (sockets.empty ()) { + // If there are no sockets beig reaped finish immediately. + if (!sockets) { send_done (); poller->rm_fd (mailbox_handle); poller->stop (); @@ -133,7 +98,22 @@ void zmq::reaper_t::process_reap (socket_base_t *socket_) { // Start termination of associated I/O object hierarchy. socket_->terminate (); - sockets.push_back (socket_); - reap (); + + // Add the socket to the poller. + socket_->start_reaping (poller); + + ++sockets; } +void zmq::reaper_t::process_reaped () +{ + --sockets; + + // If reaped was already asked to terminate and there are no more sockets, + // finish immediately. + if (!sockets && terminating) { + send_done (); + poller->rm_fd (mailbox_handle); + poller->stop (); + } +} diff --git a/src/reaper.hpp b/src/reaper.hpp index 8598cd9..6536b74 100644 --- a/src/reaper.hpp +++ b/src/reaper.hpp @@ -47,15 +47,10 @@ namespace zmq private: - void reap (); - // Command handlers. void process_stop (); void process_reap (class socket_base_t *socket_); - - // List of all sockets being terminated. - typedef std::vector <class socket_base_t*> sockets_t; - sockets_t sockets; + void process_reaped (); // Reaper thread accesses incoming commands via this mailbox. mailbox_t mailbox; @@ -66,12 +61,12 @@ namespace zmq // I/O multiplexing is performed using a poller object. poller_t *poller; + // Number of sockets being reaped at the moment. + int sockets; + // If true, we were already asked to terminate. bool terminating; - // If true, timer till next reaping is running. - bool has_timer; - reaper_t (const reaper_t&); const reaper_t &operator = (const reaper_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 0643d4d..f15ab9d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -624,24 +624,11 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) return session; } -bool zmq::socket_base_t::reap () +void zmq::socket_base_t::start_reaping (poller_t *poller_) { - // Process any commands from other threads/sockets that may be available - // at the moment. Ultimately, socket will be destroyed. - process_commands (false, false); - - // If the object was already marked as destroyed, finish the deallocation. - if (destroyed) { - - // Remove the socket from the context. - destroy_socket (this); - - // Deallocate. - own_t::process_destroy (); - return true; - } - - return false; + poller = poller_; + handle = poller->add_fd (mailbox.get_fd (), this); + poller->set_pollin (handle); } int zmq::socket_base_t::process_commands (bool block_, bool throttle_) @@ -762,3 +749,35 @@ int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_) return -1; } +void zmq::socket_base_t::in_event () +{ + // Process any commands from other threads/sockets that may be available + // at the moment. Ultimately, socket will be destroyed. + process_commands (false, false); + + // If the object was already marked as destroyed, finish the deallocation. + if (destroyed) { + + // Remove the socket from the reaper's poller. + poller->rm_fd (handle); + + // Remove the socket from the context. + destroy_socket (this); + + // Notify the reaper about the fact. + send_reaped (); + + // Deallocate. + own_t::process_destroy (); + } +} + +void zmq::socket_base_t::out_event () +{ + zmq_assert (false); +} + +void zmq::socket_base_t::timer_event (int id_) +{ + zmq_assert (false); +} diff --git a/src/socket_base.hpp b/src/socket_base.hpp index a74b7d0..cea5bc8 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -29,7 +29,9 @@ #include "array.hpp" #include "mutex.hpp" #include "stdint.hpp" +#include "poller.hpp" #include "atomic_counter.hpp" +#include "i_poll_events.hpp" #include "mailbox.hpp" #include "stdint.hpp" #include "blob.hpp" @@ -40,7 +42,8 @@ namespace zmq class socket_base_t : public own_t, - public array_item_t + public array_item_t, + public i_poll_events { friend class reaper_t; @@ -84,9 +87,15 @@ namespace zmq void activated (class writer_t *pipe_); void terminated (class writer_t *pipe_); - // This function should be called only on sockets that are already - // closed -- from the reaper thread. It tries to finalise the socket. - bool reap (); + // Using this function reaper thread ask the socket to regiter with + // its poller. + void start_reaping (poller_t *poller_); + + // i_poll_events implementation. This interface is used when socket + // is handled by the poller in the reaper thread. + void in_event (); + void out_event (); + void timer_event (int id_); protected: @@ -157,6 +166,10 @@ namespace zmq // Socket's mailbox object. mailbox_t mailbox; + // Reaper's poller and handle of this socket within it. + poller_t *poller; + poller_t::handle_t handle; + // Timestamp of when commands were processed the last time. uint64_t last_tsc; |