From a801b6d8b37557ccfb53030dca22f89a3f99b59c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 20 Aug 2009 11:32:23 +0200 Subject: couple of bugs in shutdown mechanism fixed --- src/socket_base.cpp | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 9701f65..fa6c1e3 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -27,7 +27,9 @@ #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" +#include "session.hpp" #include "config.hpp" +#include "owned.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), @@ -60,6 +62,9 @@ zmq::socket_base_t::~socket_base_t () while (pending_term_acks) app_thread->process_commands (true); } + + // Check whether there are no session leaks. + zmq_assert (sessions.empty ()); } int zmq::socket_base_t::setsockopt (int option_, void *optval_, @@ -169,12 +174,43 @@ int zmq::socket_base_t::close () return 0; } -void zmq::socket_base_t::process_own (object_t *object_) +void zmq::socket_base_t::register_session (const char *name_, + session_t *session_) +{ + sessions_sync.lock (); + bool inserted = sessions.insert (std::make_pair (name_, session_)).second; + zmq_assert (inserted); + sessions_sync.unlock (); +} + +void zmq::socket_base_t::unregister_session (const char *name_) +{ + sessions_sync.lock (); + sessions_t::iterator it = sessions.find (name_); + zmq_assert (it != sessions.end ()); + sessions.erase (it); + sessions_sync.unlock (); +} + +zmq::session_t *zmq::socket_base_t::get_session (const char *name_) +{ + sessions_sync.lock (); + sessions_t::iterator it = sessions.find (name_); + session_t *session = NULL; + if (it != sessions.end ()) { + session = it->second; + session->inc_seqnum (); + } + sessions_sync.unlock (); + return session; +} + +void zmq::socket_base_t::process_own (owned_t *object_) { io_objects.insert (object_); } -void zmq::socket_base_t::process_term_req (object_t *object_) +void zmq::socket_base_t::process_term_req (owned_t *object_) { // If I/O object is well and alive ask it to terminate. io_objects_t::iterator it = std::find (io_objects.begin (), -- cgit v1.2.3