From 6be4b0143793ab5ceebc5d9d6bbe5c2f1333a0d2 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 21 Aug 2009 14:29:22 +0200 Subject: session management implemented --- src/socket_base.cpp | 77 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 53 insertions(+), 24 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fa6c1e3..fb7bdcf 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -17,6 +17,7 @@ along with this program. If not, see . */ +#include #include #include "../include/zmq.h" @@ -30,16 +31,20 @@ #include "session.hpp" #include "config.hpp" #include "owned.hpp" +#include "uuid.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), pending_term_acks (0), - app_thread (parent_) + app_thread (parent_), + shutting_down (false) { } zmq::socket_base_t::~socket_base_t () { + shutting_down = true; + while (true) { // On third pass of the loop there should be no more I/O objects @@ -64,10 +69,12 @@ zmq::socket_base_t::~socket_base_t () } // Check whether there are no session leaks. + sessions_sync.lock (); zmq_assert (sessions.empty ()); + sessions_sync.unlock (); } -int zmq::socket_base_t::setsockopt (int option_, void *optval_, +int zmq::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { switch (option_) { @@ -113,11 +120,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, return 0; case ZMQ_IDENTITY: - if (optvallen_ != sizeof (const char*)) { - errno = EINVAL; - return -1; - } - options.identity = (const char*) optval_; + options.identity.assign ((const char*) optval_, optvallen_); return 0; default: @@ -141,18 +144,34 @@ int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::connect (const char *addr_) { + // Generate a unique name for the session. + std::string session_name ("#"); + session_name += uuid_t ().to_string (); + + // Create the session. + io_thread_t *io_thread = choose_io_thread (options.affinity); + session_t *session = new session_t (io_thread, this, session_name.c_str ()); + zmq_assert (session); + send_plug (session); + send_own (this, session); + + // Create the connecter object. Supply it with the session name so that + // it can bind the new connection to the session once it is established. zmq_connecter_t *connecter = new zmq_connecter_t ( - choose_io_thread (options.affinity), this, options); + choose_io_thread (options.affinity), this, options, + session_name.c_str ()); int rc = connecter->set_address (addr_); - if (rc != 0) + if (rc != 0) { + delete connecter; return -1; - + } send_plug (connecter); send_own (this, connecter); + return 0; } -int zmq::socket_base_t::send (struct zmq_msg *msg_, int flags_) +int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { zmq_assert (false); } @@ -162,7 +181,7 @@ int zmq::socket_base_t::flush () zmq_assert (false); } -int zmq::socket_base_t::recv (struct zmq_msg *msg_, int flags_) +int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { zmq_assert (false); } @@ -174,35 +193,40 @@ int zmq::socket_base_t::close () return 0; } -void zmq::socket_base_t::register_session (const char *name_, +bool zmq::socket_base_t::register_session (const char *name_, session_t *session_) { sessions_sync.lock (); - bool inserted = sessions.insert (std::make_pair (name_, session_)).second; - zmq_assert (inserted); + bool registered = sessions.insert (std::make_pair (name_, session_)).second; sessions_sync.unlock (); + return registered; } -void zmq::socket_base_t::unregister_session (const char *name_) +bool zmq::socket_base_t::unregister_session (const char *name_) { sessions_sync.lock (); sessions_t::iterator it = sessions.find (name_); - zmq_assert (it != sessions.end ()); + bool unregistered = (it != sessions.end ()); sessions.erase (it); sessions_sync.unlock (); + return unregistered; } -zmq::session_t *zmq::socket_base_t::get_session (const char *name_) +zmq::session_t *zmq::socket_base_t::find_session (const char *name_) { sessions_sync.lock (); + sessions_t::iterator it = sessions.find (name_); - session_t *session = NULL; - if (it != sessions.end ()) { - session = it->second; - session->inc_seqnum (); - } + if (it == sessions.end ()) { + sessions_sync.unlock (); + return NULL; + } + + // Prepare the session for subsequent attach command. + it->second->inc_seqnum (); + sessions_sync.unlock (); - return session; + return it->second; } void zmq::socket_base_t::process_own (owned_t *object_) @@ -212,6 +236,11 @@ void zmq::socket_base_t::process_own (owned_t *object_) void zmq::socket_base_t::process_term_req (owned_t *object_) { + // When shutting down we can ignore termination requests from owned + // objects. They are going to be terminated anyway. + if (shutting_down) + return; + // If I/O object is well and alive ask it to terminate. io_objects_t::iterator it = std::find (io_objects.begin (), io_objects.end (), object_); -- cgit v1.2.3