From e645fc2693acc796304498909786b7b47005b429 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Mon, 23 Jan 2012 08:53:35 +0100 Subject: Imported Upstream version 2.1.3 --- src/zmq_init.cpp | 185 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 103 insertions(+), 82 deletions(-) (limited to 'src/zmq_init.cpp') diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 5824f5c..cf65d69 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -1,50 +1,56 @@ /* - Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include #include "zmq_init.hpp" +#include "transient_session.hpp" +#include "named_session.hpp" +#include "socket_base.hpp" #include "zmq_engine.hpp" #include "io_thread.hpp" #include "session.hpp" #include "uuid.hpp" +#include "blob.hpp" #include "err.hpp" -zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, - fd_t fd_, const options_t &options_, bool reconnect_, - const char *protocol_, const char *address_, uint64_t session_ordinal_) : - owned_t (parent_, owner_), +zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, + socket_base_t *socket_, session_t *session_, fd_t fd_, + const options_t &options_) : + own_t (io_thread_, options_), + ephemeral_engine (NULL), sent (false), received (false), - session_ordinal (session_ordinal_), - options (options_) + socket (socket_), + session (session_), + io_thread (io_thread_) { // Create the engine object for this connection. - engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, - reconnect_, protocol_, address_); - zmq_assert (engine); + engine = new (std::nothrow) zmq_engine_t (fd_, options); + alloc_assert (engine); } zmq::zmq_init_t::~zmq_init_t () { if (engine) - delete engine; + engine->terminate (); } bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) @@ -60,9 +66,8 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) options.identity.size ()); sent = true; - // If initialisation is done, pass the engine to the session and - // destroy the init object. - finalise (); + // Try finalize initialization. + finalise_initialisation (); return true; } @@ -85,9 +90,14 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), zmq_msg_size (msg_)); } + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); received = true; + // Try finalize initialization. + finalise_initialisation (); + return true; } @@ -97,46 +107,30 @@ void zmq::zmq_init_t::flush () if (!received) return; - // If initialisation is done, pass the engine to the session and - // destroy the init object. - finalise (); + // Initialization is done, dispatch engine. + if (ephemeral_engine) + dispatch_engine (); } -void zmq::zmq_init_t::detach (owned_t *reconnecter_) +void zmq::zmq_init_t::detach () { // This function is called by engine when disconnection occurs. - // If required, launch the reconnecter. - if (reconnecter_) { - send_plug (reconnecter_); - send_own (owner, reconnecter_); - } + // If there is an associated session, send it a null engine to let it know + // that connection process was unsuccesful. + if (session) + send_attach (session, NULL, blob_t (), true); // The engine will destroy itself, so let's just drop the pointer here and // start termination of the init object. engine = NULL; - term (); -} - -zmq::io_thread_t *zmq::zmq_init_t::get_io_thread () -{ - return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::zmq_init_t::get_owner () -{ - return owner; -} - -uint64_t zmq::zmq_init_t::get_ordinal () -{ - return session_ordinal; + terminate (); } void zmq::zmq_init_t::process_plug () { zmq_assert (engine); - engine->plug (this); + engine->plug (io_thread, this); } void zmq::zmq_init_t::process_unplug () @@ -145,51 +139,78 @@ void zmq::zmq_init_t::process_unplug () engine->unplug (); } -void zmq::zmq_init_t::finalise () +void zmq::zmq_init_t::finalise_initialisation () +{ + // Unplug and prepare to dispatch engine. + if (sent && received) { + ephemeral_engine = engine; + engine = NULL; + ephemeral_engine->unplug (); + return; + } +} + +void zmq::zmq_init_t::dispatch_engine () { if (sent && received) { - // Disconnect the engine from the init object. - engine->unplug (); + // Engine must be detached. + zmq_assert (!engine); + zmq_assert (ephemeral_engine); + + // If we know what session we belong to, it's easy, just send the + // engine to that session and destroy the init object. Note that we + // know about the session only if this object is owned by it. Thus, + // lifetime of this object in contained in the lifetime of the session + // so the pointer cannot become invalid without notice. + if (session) { + send_attach (session, ephemeral_engine, peer_identity, true); + terminate (); + return; + } - session_t *session = NULL; - - // If we have the session ordinal, let's use it to find the session. - // If it is not found, it means socket is already being shut down - // and the session have been deallocated. - // TODO: We should check whether the name of the peer haven't changed - // upon reconnection. - if (session_ordinal) { - session = owner->find_session (session_ordinal); - if (!session) { - term (); - return; - } + // All the cases below are listener-based. Therefore we need the socket + // reference so that new sessions can bind to that socket. + zmq_assert (socket); + + // We have no associated session. If the peer has no identity we'll + // create a transient session for the connection. Note that + // seqnum is incremented to account for attach command before the + // session is launched. That way we are sure it won't terminate before + // being attached. + if (peer_identity [0] == 0) { + session = new (std::nothrow) transient_session_t (io_thread, + socket, options); + alloc_assert (session); + session->inc_seqnum (); + launch_sibling (session); + send_attach (session, ephemeral_engine, peer_identity, false); + terminate (); + return; } - else { - - // If the peer has a unique name, find the associated session. - // If it does not exist, create it. - zmq_assert (!peer_identity.empty ()); - session = owner->find_session (peer_identity); - if (!session) { - session = new (std::nothrow) session_t ( - choose_io_thread (options.affinity), owner, options, - peer_identity); - zmq_assert (session); - send_plug (session); - send_own (owner, session); - - // Reserve a sequence number for following 'attach' command. - session->inc_seqnum (); - } + + // Try to find the session corresponding to the peer's identity. + // If found, send the engine to that session and destroy this object. + // Note that session's seqnum is incremented by find_session rather + // than by send_attach. + session = socket->find_session (peer_identity); + if (session) { + send_attach (session, ephemeral_engine, peer_identity, false); + terminate (); + return; } - // No need to increment seqnum as it was already incremented above. - send_attach (session, engine, peer_identity, false); - - // Destroy the init object. - engine = NULL; - term (); + // There's no such named session. We have to create one. Note that + // seqnum is incremented to account for attach command before the + // session is launched. That way we are sure it won't terminate before + // being attached. + session = new (std::nothrow) named_session_t (io_thread, socket, + options, peer_identity); + alloc_assert (session); + session->inc_seqnum (); + launch_sibling (session); + send_attach (session, ephemeral_engine, peer_identity, false); + terminate (); + return; } } -- cgit v1.2.3