summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/command.hpp5
-rw-r--r--src/object.cpp20
-rw-r--r--src/object.hpp2
-rw-r--r--src/reaper.cpp64
-rw-r--r--src/reaper.hpp13
-rw-r--r--src/socket_base.cpp53
-rw-r--r--src/socket_base.hpp21
7 files changed, 106 insertions, 72 deletions
diff --git a/src/command.hpp b/src/command.hpp
index ec0850d..d03de2f 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -47,6 +47,7 @@ namespace zmq
term,
term_ack,
reap,
+ reaped,
done
} type;
@@ -125,6 +126,10 @@ namespace zmq
class socket_base_t *socket;
} reap;
+ // Closed socket notifies the reaper that it's already deallocated.
+ struct {
+ } reaped;
+
// Sent by reaper thread to the term thread when all the sockets
// are successfully deallocated.
struct {
diff --git a/src/object.cpp b/src/object.cpp
index 823d4b6..9ec73f7 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -118,6 +118,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_reap (cmd_.args.reap.socket);
break;
+ case command_t::reaped:
+ process_reaped ();
+ break;
+
default:
zmq_assert (false);
}
@@ -352,6 +356,17 @@ void zmq::object_t::send_reap (class socket_base_t *socket_)
send_command (cmd);
}
+void zmq::object_t::send_reaped ()
+{
+ command_t cmd;
+#if defined ZMQ_MAKE_VALGRIND_HAPPY
+ memset (&cmd, 0, sizeof (cmd));
+#endif
+ cmd.destination = ctx->get_reaper ();
+ cmd.type = command_t::reaped;
+ send_command (cmd);
+}
+
void zmq::object_t::send_done ()
{
command_t cmd;
@@ -430,6 +445,11 @@ void zmq::object_t::process_reap (class socket_base_t *socket_)
zmq_assert (false);
}
+void zmq::object_t::process_reaped ()
+{
+ zmq_assert (false);
+}
+
void zmq::object_t::process_seqnum ()
{
zmq_assert (false);
diff --git a/src/object.hpp b/src/object.hpp
index cee82c8..748a339 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -80,6 +80,7 @@ namespace zmq
void send_term (class own_t *destination_, int linger_);
void send_term_ack (class own_t *destination_);
void send_reap (class socket_base_t *socket_);
+ void send_reaped ();
void send_done ();
// These handlers can be overloaded by the derived objects. They are
@@ -99,6 +100,7 @@ namespace zmq
virtual void process_term (int linger_);
virtual void process_term_ack ();
virtual void process_reap (class socket_base_t *socket_);
+ virtual void process_reaped ();
// Special handler called after a command that requires a seqnum
// was processed. The implementation should catch up with its counter
diff --git a/src/reaper.cpp b/src/reaper.cpp
index f114f56..b1e2796 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -23,8 +23,8 @@
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_),
- terminating (false),
- has_timer (false)
+ sockets (0),
+ terminating (false)
{
poller = new (std::nothrow) poller_t;
zmq_assert (poller);
@@ -74,55 +74,20 @@ void zmq::reaper_t::in_event ()
void zmq::reaper_t::out_event ()
{
- // We are never polling for POLLOUT here. This function is never called.
zmq_assert (false);
}
void zmq::reaper_t::timer_event (int id_)
{
- zmq_assert (has_timer);
- has_timer = false;
- reap ();
-}
-
-void zmq::reaper_t::reap ()
-{
- // Try to reap each socket in the list.
- for (sockets_t::iterator it = sockets.begin (); it != sockets.end ();) {
- if ((*it)->reap ()) {
-
- // MSVC version of STL requires this to be done a spacial way...
-#if defined _MSC_VER
- it = sockets.erase (it);
-#else
- sockets.erase (it);
-#endif
- }
- else
- ++it;
- }
-
- // If there are still sockets to reap, wait a while, then try again.
- if (!sockets.empty () && !has_timer) {
- poller->add_timer (1 , this, 0);
- has_timer = true;
- return;
- }
-
- // No more sockets and the context is already shutting down.
- if (terminating) {
- send_done ();
- poller->rm_fd (mailbox_handle);
- poller->stop ();
- return;
- }
+ zmq_assert (false);
}
void zmq::reaper_t::process_stop ()
{
terminating = true;
- if (sockets.empty ()) {
+ // If there are no sockets beig reaped finish immediately.
+ if (!sockets) {
send_done ();
poller->rm_fd (mailbox_handle);
poller->stop ();
@@ -133,7 +98,22 @@ void zmq::reaper_t::process_reap (socket_base_t *socket_)
{
// Start termination of associated I/O object hierarchy.
socket_->terminate ();
- sockets.push_back (socket_);
- reap ();
+
+ // Add the socket to the poller.
+ socket_->start_reaping (poller);
+
+ ++sockets;
}
+void zmq::reaper_t::process_reaped ()
+{
+ --sockets;
+
+ // If reaped was already asked to terminate and there are no more sockets,
+ // finish immediately.
+ if (!sockets && terminating) {
+ send_done ();
+ poller->rm_fd (mailbox_handle);
+ poller->stop ();
+ }
+}
diff --git a/src/reaper.hpp b/src/reaper.hpp
index 8598cd9..6536b74 100644
--- a/src/reaper.hpp
+++ b/src/reaper.hpp
@@ -47,15 +47,10 @@ namespace zmq
private:
- void reap ();
-
// Command handlers.
void process_stop ();
void process_reap (class socket_base_t *socket_);
-
- // List of all sockets being terminated.
- typedef std::vector <class socket_base_t*> sockets_t;
- sockets_t sockets;
+ void process_reaped ();
// Reaper thread accesses incoming commands via this mailbox.
mailbox_t mailbox;
@@ -66,12 +61,12 @@ namespace zmq
// I/O multiplexing is performed using a poller object.
poller_t *poller;
+ // Number of sockets being reaped at the moment.
+ int sockets;
+
// If true, we were already asked to terminate.
bool terminating;
- // If true, timer till next reaping is running.
- bool has_timer;
-
reaper_t (const reaper_t&);
const reaper_t &operator = (const reaper_t&);
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 0643d4d..f15ab9d 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -624,24 +624,11 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
return session;
}
-bool zmq::socket_base_t::reap ()
+void zmq::socket_base_t::start_reaping (poller_t *poller_)
{
- // Process any commands from other threads/sockets that may be available
- // at the moment. Ultimately, socket will be destroyed.
- process_commands (false, false);
-
- // If the object was already marked as destroyed, finish the deallocation.
- if (destroyed) {
-
- // Remove the socket from the context.
- destroy_socket (this);
-
- // Deallocate.
- own_t::process_destroy ();
- return true;
- }
-
- return false;
+ poller = poller_;
+ handle = poller->add_fd (mailbox.get_fd (), this);
+ poller->set_pollin (handle);
}
int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
@@ -762,3 +749,35 @@ int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_)
return -1;
}
+void zmq::socket_base_t::in_event ()
+{
+ // Process any commands from other threads/sockets that may be available
+ // at the moment. Ultimately, socket will be destroyed.
+ process_commands (false, false);
+
+ // If the object was already marked as destroyed, finish the deallocation.
+ if (destroyed) {
+
+ // Remove the socket from the reaper's poller.
+ poller->rm_fd (handle);
+
+ // Remove the socket from the context.
+ destroy_socket (this);
+
+ // Notify the reaper about the fact.
+ send_reaped ();
+
+ // Deallocate.
+ own_t::process_destroy ();
+ }
+}
+
+void zmq::socket_base_t::out_event ()
+{
+ zmq_assert (false);
+}
+
+void zmq::socket_base_t::timer_event (int id_)
+{
+ zmq_assert (false);
+}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index a74b7d0..cea5bc8 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -29,7 +29,9 @@
#include "array.hpp"
#include "mutex.hpp"
#include "stdint.hpp"
+#include "poller.hpp"
#include "atomic_counter.hpp"
+#include "i_poll_events.hpp"
#include "mailbox.hpp"
#include "stdint.hpp"
#include "blob.hpp"
@@ -40,7 +42,8 @@ namespace zmq
class socket_base_t :
public own_t,
- public array_item_t
+ public array_item_t,
+ public i_poll_events
{
friend class reaper_t;
@@ -84,9 +87,15 @@ namespace zmq
void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_);
- // This function should be called only on sockets that are already
- // closed -- from the reaper thread. It tries to finalise the socket.
- bool reap ();
+ // Using this function reaper thread ask the socket to regiter with
+ // its poller.
+ void start_reaping (poller_t *poller_);
+
+ // i_poll_events implementation. This interface is used when socket
+ // is handled by the poller in the reaper thread.
+ void in_event ();
+ void out_event ();
+ void timer_event (int id_);
protected:
@@ -157,6 +166,10 @@ namespace zmq
// Socket's mailbox object.
mailbox_t mailbox;
+ // Reaper's poller and handle of this socket within it.
+ poller_t *poller;
+ poller_t::handle_t handle;
+
// Timestamp of when commands were processed the last time.
uint64_t last_tsc;