From a801b6d8b37557ccfb53030dca22f89a3f99b59c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 20 Aug 2009 11:32:23 +0200 Subject: couple of bugs in shutdown mechanism fixed --- src/socket_base.hpp | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) (limited to 'src/socket_base.hpp') diff --git a/src/socket_base.hpp b/src/socket_base.hpp index e96cc2d..8e99654 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -21,9 +21,11 @@ #define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ #include +#include #include #include "object.hpp" +#include "mutex.hpp" #include "options.hpp" #include "stdint.hpp" @@ -46,16 +48,26 @@ namespace zmq virtual int recv (struct zmq_msg *msg_, int flags_); virtual int close (); + // Functions that owned objects use to manipulate socket's list + // of existing sessions. + // Note that this functionality cannot be implemented via inter-thread + // commands as it is unacceptable to wait for the completion of the + // action till user application yields control of the application + // thread to 0MQ. + void register_session (const char *name_, class session_t *session_); + void unregister_session (const char *name_); + class session_t *get_session (const char *name_); + private: // Handlers for incoming commands. - void process_own (object_t *object_); - void process_term_req (object_t *object_); + void process_own (class owned_t *object_); + void process_term_req (class owned_t *object_); void process_term_ack (); // List of all I/O objects owned by this socket. The socket is // responsible for deallocating them before it quits. - typedef std::set io_objects_t; + typedef std::set io_objects_t; io_objects_t io_objects; // Number of I/O objects that were already asked to terminate @@ -68,6 +80,14 @@ namespace zmq // Socket options. options_t options; + // List of existing sessions. This list is never referenced from within + // the socket, instead it is used by I/O objects owned by the session. + // As those objects can live in different threads, the access is + // synchronised using 'sessions_sync' mutex. + typedef std::map sessions_t; + sessions_t sessions; + mutex_t sessions_sync; + socket_base_t (const socket_base_t&); void operator = (const socket_base_t&); }; -- cgit v1.2.3