From aebff623f36efddc0de7a3192832b61802f8cec8 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 23 Dec 2009 19:37:56 +0100 Subject: ZMQII-28: Bidirectional introduction on TCP connection establishment --- src/socket_base.cpp | 83 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 25 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fde258c..43209d5 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -32,7 +32,6 @@ #include "session.hpp" #include "config.hpp" #include "owned.hpp" -#include "uuid.hpp" #include "pipe.hpp" #include "err.hpp" #include "platform.hpp" @@ -46,7 +45,8 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : app_thread (parent_), shutting_down (false), sent_seqnum (0), - processed_seqnum (0) + processed_seqnum (0), + next_ordinal (1) { } @@ -114,10 +114,6 @@ int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::connect (const char *addr_) { - // Generate a unique name for the session. - std::string session_name ("#"); - session_name += uuid_t ().to_string (); - // Parse addr_ string. std::string addr_type; std::string addr_args; @@ -170,10 +166,10 @@ int zmq::socket_base_t::connect (const char *addr_) return 0; } - // Create the session. + // Create unnamed session. io_thread_t *io_thread = choose_io_thread (options.affinity); - session_t *session = new (std::nothrow) session_t (io_thread, this, - session_name.c_str (), options, true); + session_t *session = new (std::nothrow) session_t (io_thread, + this, options); zmq_assert (session); pipe_t *in_pipe = NULL; @@ -213,7 +209,7 @@ int zmq::socket_base_t::connect (const char *addr_) // it is established. zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( choose_io_thread (options.affinity), this, options, - session_name.c_str (), false); + session->get_ordinal (), false); zmq_assert (connecter); int rc = connecter->set_address (addr_args.c_str ()); if (rc != 0) { @@ -245,8 +241,7 @@ int zmq::socket_base_t::connect (const char *addr_) // PGM sender. pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( - choose_io_thread (options.affinity), options, - session_name.c_str ()); + choose_io_thread (options.affinity), options); zmq_assert (pgm_sender); int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); @@ -261,8 +256,7 @@ int zmq::socket_base_t::connect (const char *addr_) // PGM receiver. pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( - choose_io_thread (options.affinity), options, - session_name.c_str ()); + choose_io_thread (options.affinity), options); zmq_assert (pgm_receiver); int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); @@ -408,7 +402,8 @@ int zmq::socket_base_t::close () // Check whether there are no session leaks. sessions_sync.lock (); - zmq_assert (sessions.empty ()); + zmq_assert (named_sessions.empty ()); + zmq_assert (unnamed_sessions.empty ()); sessions_sync.unlock (); delete this; @@ -445,36 +440,74 @@ bool zmq::socket_base_t::register_session (const char *name_, session_t *session_) { sessions_sync.lock (); - bool registered = sessions.insert (std::make_pair (name_, session_)).second; + bool registered = + named_sessions.insert (std::make_pair (name_, session_)).second; sessions_sync.unlock (); return registered; } -bool zmq::socket_base_t::unregister_session (const char *name_) +void zmq::socket_base_t::unregister_session (const char *name_) { sessions_sync.lock (); - sessions_t::iterator it = sessions.find (name_); - bool unregistered = (it != sessions.end ()); - sessions.erase (it); + named_sessions_t::iterator it = named_sessions.find (name_); + zmq_assert (it != named_sessions.end ()); + named_sessions.erase (it); sessions_sync.unlock (); - return unregistered; } zmq::session_t *zmq::socket_base_t::find_session (const char *name_) { sessions_sync.lock (); - sessions_t::iterator it = sessions.find (name_); - if (it == sessions.end ()) { + named_sessions_t::iterator it = named_sessions.find (name_); + if (it == named_sessions.end ()) { + sessions_sync.unlock (); + return NULL; + } + session_t *session = it->second; + + // Prepare the session for subsequent attach command. + session->inc_seqnum (); + + sessions_sync.unlock (); + return session; +} + +uint64_t zmq::socket_base_t::register_session (session_t *session_) +{ + sessions_sync.lock (); + uint64_t ordinal = next_ordinal; + next_ordinal++; + unnamed_sessions.insert (std::make_pair (ordinal, session_)).second; + sessions_sync.unlock (); + return ordinal; +} + +void zmq::socket_base_t::unregister_session (uint64_t ordinal_) +{ + sessions_sync.lock (); + unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); + zmq_assert (it != unnamed_sessions.end ()); + unnamed_sessions.erase (it); + sessions_sync.unlock (); +} + +zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_) +{ + sessions_sync.lock (); + + unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); + if (it == unnamed_sessions.end ()) { sessions_sync.unlock (); return NULL; } + session_t *session = it->second; // Prepare the session for subsequent attach command. - it->second->inc_seqnum (); + session->inc_seqnum (); sessions_sync.unlock (); - return it->second; + return session; } void zmq::socket_base_t::kill (reader_t *pipe_) -- cgit v1.2.3