summaryrefslogtreecommitdiff
path: root/src/ctx.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-02-09 15:32:15 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-02-09 15:32:15 +0100
commit80ac398bba928fa7f245d2e107071677a13185cf (patch)
tree57c51a7adb5f4cb1a91396fe0b223538a4d428d7 /src/ctx.hpp
parent889424e675eecd9d9c7d1121456401d5c43029a5 (diff)
Initial implementation of reaper thread.
Reaper thread destroys the socket asynchronously. zmq_term() can be interrupted by a signal (EINTR). zmq_socket() will return ETERM after zmq_term() was called. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/ctx.hpp')
-rw-r--r--src/ctx.hpp43
1 files changed, 21 insertions, 22 deletions
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 92684e6..c07711e 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -64,11 +64,9 @@ namespace zmq
// after the last one is closed.
int terminate ();
- // Create a socket.
+ // Create and destroy a socket.
class socket_base_t *create_socket (int type_);
-
- // Make socket a zombie.
- void zombify_socket (socket_base_t *socket_);
+ void destroy_socket (class socket_base_t *socket_);
// Send command to the destination thread.
void send_command (uint32_t tid_, const command_t &command_);
@@ -78,6 +76,9 @@ namespace zmq
// Returns NULL is no I/O thread is available.
class io_thread_t *choose_io_thread (uint64_t affinity_);
+ // Returns reaper thread object.
+ class object_t *get_reaper ();
+
// Management of inproc endpoints.
int register_endpoint (const char *addr_, endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_);
@@ -86,41 +87,36 @@ namespace zmq
// Logging.
void log (zmq_msg_t *msg_);
+ enum {
+ term_tid = 0,
+ reaper_tid = 1
+ };
+
private:
~ctx_t ();
- // Sockets belonging to this context.
+ // Sockets belonging to this context. We need the list so that
+ // we can notify the sockets when zmq_term() is called. The sockets
+ // will return ETERM then.
typedef array_t <socket_base_t> sockets_t;
sockets_t sockets;
- // List of sockets that were already closed but not yet deallocated.
- // These sockets still have some pipes and I/O objects attached.
- typedef std::vector <socket_base_t*> zombies_t;
- zombies_t zombies;
-
// List of unused thread slots.
typedef std::vector <uint32_t> emtpy_slots_t;
emtpy_slots_t empty_slots;
- // If true, shutdown thread wants to be informed when there are no
- // more open sockets. Do so by posting no_sockets_sync semaphore.
- // Note that this variable is synchronised by slot_sync mutex.
- bool no_sockets_notify;
-
- // Object used by zmq_term to wait while all the sockets are closed
- // by different application threads.
- semaphore_t no_sockets_sync;
+ // If true, zmq_term was already called.
+ bool terminating;
// Synchronisation of accesses to global slot-related data:
- // sockets, zombies, empty_slots, terminated. It also synchronises
+ // sockets, empty_slots, terminating. It also synchronises
// access to zombie sockets as such (as oposed to slots) and provides
// a memory barrier to ensure that all CPU cores see the same data.
mutex_t slot_sync;
- // This function attempts to deallocate as many zombie sockets as
- // possible. It must be called within a slot_sync critical section.
- void dezombify ();
+ // The reaper thread.
+ class reaper_t *reaper;
// I/O threads.
typedef std::vector <class io_thread_t*> io_threads_t;
@@ -130,6 +126,9 @@ namespace zmq
uint32_t slot_count;
mailbox_t **slots;
+ // Mailbox for zmq_term thread.
+ mailbox_t term_mailbox;
+
// List of inproc endpoints within this context.
typedef std::map <std::string, endpoint_t> endpoints_t;
endpoints_t endpoints;