diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/app_thread.cpp | 4 | ||||
-rw-r--r-- | src/dispatcher.cpp | 36 | ||||
-rw-r--r-- | src/dispatcher.hpp | 25 | ||||
-rw-r--r-- | src/object.cpp | 5 | ||||
-rw-r--r-- | src/object.hpp | 1 | ||||
-rw-r--r-- | src/session.cpp | 7 | ||||
-rw-r--r-- | src/socket_base.cpp | 12 | ||||
-rw-r--r-- | src/zmq.cpp | 3 |
8 files changed, 82 insertions, 11 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp index e108594..58fe19d 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -51,9 +51,7 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : zmq::app_thread_t::~app_thread_t () { - // Destroy all the sockets owned by this application thread. - for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) - delete *it; + zmq_assert (sockets.empty ()); } zmq::i_signaler *zmq::app_thread_t::get_signaler () diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 71e20df..49c2197 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -30,7 +30,9 @@ #include "windows.h" #endif -zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : + sockets (0), + terminated (false) { #ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple @@ -68,6 +70,20 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) io_threads [i]->start (); } +int zmq::dispatcher_t::term () +{ + term_sync.lock (); + zmq_assert (!terminated); + terminated = true; + bool destroy = (sockets == 0); + term_sync.unlock (); + + if (destroy) + delete this; + + return 0; +} + zmq::dispatcher_t::~dispatcher_t () { // Close all application theads, sockets, io_objects etc. @@ -111,9 +127,27 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) } threads_sync.unlock (); + term_sync.lock (); + sockets++; + term_sync.unlock (); + return thread->create_socket (type_); } +void zmq::dispatcher_t::destroy_socket () +{ + // If zmq_term was already called and there are no more sockets, + // terminate the whole 0MQ infrastructure. + term_sync.lock (); + zmq_assert (sockets > 0); + sockets--; + bool destroy = (sockets == 0 && terminated); + term_sync.unlock (); + + if (destroy) + delete this; +} + zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () { // Check whether thread ID is already assigned. If so, return it. diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index cb445ef..bd1f655 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -52,12 +52,18 @@ namespace zmq // signalers. dispatcher_t (int app_threads_, int io_threads_); - // To be called to terminate the whole infrastructure (zmq_term). - ~dispatcher_t (); + // This function is called when user invokes zmq_term. If there are + // no more sockets open it'll cause all the infrastructure to be shut + // down. If there are open sockets still, the deallocation happens + // after the last one is closed. + int term (); // Create a socket. class socket_base_t *create_socket (int type_); + // Destroy a socket. + void destroy_socket (); + // Returns number of thread slots in the dispatcher. To be used by // individual threads to find out how many distinct signals can be // received. @@ -93,6 +99,8 @@ namespace zmq private: + ~dispatcher_t (); + // Returns the app thread associated with the current thread. // NULL if we are out of app thread slots. class app_thread_t *choose_app_thread (); @@ -127,9 +135,20 @@ namespace zmq typedef std::set <class pipe_t*> pipes_t; pipes_t pipes; - // Synchronisation of access to the pipes repository. + // Synchronisation of access to the pipes repository. mutex_t pipes_sync; + // Number of sockets alive. + int sockets; + + // If true, zmq_term was already called. When last socket is closed + // the whole 0MQ infrastructure should be deallocated. + bool terminated; + + // Synchronisation of access to the termination data (socket count + // and 'terminated' flag). + mutex_t term_sync; + dispatcher_t (const dispatcher_t&); void operator = (const dispatcher_t&); }; diff --git a/src/object.cpp b/src/object.cpp index c0ef21c..1433b7b 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -53,6 +53,11 @@ int zmq::object_t::get_thread_slot () return thread_slot; } +zmq::dispatcher_t *zmq::object_t::get_dispatcher () +{ + return dispatcher; +} + void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { diff --git a/src/object.hpp b/src/object.hpp index 250e856..2e41507 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -40,6 +40,7 @@ namespace zmq ~object_t (); int get_thread_slot (); + dispatcher_t *get_dispatcher (); void process_command (struct command_t &cmd_); // Allow pipe to access corresponding dispatcher functions. diff --git a/src/session.cpp b/src/session.cpp index ac2dd12..bc334e0 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -54,7 +54,12 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_) { - return out_pipe->write (msg_); + if (out_pipe->write (msg_)) { + zmq_msg_init (msg_); + return true; + } + + return false; } void zmq::session_t::flush () diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6ad1f55..93a0a4c 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -24,7 +24,7 @@ #include "socket_base.hpp" #include "app_thread.hpp" -#include "err.hpp" +#include "dispatcher.hpp" #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "msg_content.hpp" @@ -34,6 +34,7 @@ #include "owned.hpp" #include "uuid.hpp" #include "pipe.hpp" +#include "err.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), @@ -288,7 +289,16 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { app_thread->remove_socket (this); + + // Pointer to the dispatcher must be retrieved before the socket is + // deallocated. Afterwards it is not available. + dispatcher_t *dispatcher = get_dispatcher (); delete this; + + // This function must be called after the socket is completely deallocated + // as it may cause termination of the whole 0MQ infrastructure. + dispatcher->destroy_socket (); + return 0; } diff --git a/src/zmq.cpp b/src/zmq.cpp index 49096ad..0ffd530 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -183,8 +183,7 @@ void *zmq_init (int app_threads_, int io_threads_) int zmq_term (void *dispatcher_) { - delete (zmq::dispatcher_t*) dispatcher_; - return 0; + return ((zmq::dispatcher_t*) dispatcher_)->term (); } void *zmq_socket (void *dispatcher_, int type_) |