diff options
| author | Martin Lucina <mato@kotelna.sk> | 2011-03-28 10:39:51 +0200 | 
|---|---|---|
| committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:37 +0100 | 
| commit | 3e20cb1b8a2b1ca222011df37334e5f4f88dd565 (patch) | |
| tree | 4a753775186bc7f583f1ceb3f9aa675b6f110596 /src/zmq_init.cpp | |
| parent | 3f0085ddbef1a44b6bb7a0b23af497d56e0025fa (diff) | |
| parent | e645fc2693acc796304498909786b7b47005b429 (diff) | |
Imported Debian patch 2.1.3-1debian/2.1.3-1
Diffstat (limited to 'src/zmq_init.cpp')
| -rw-r--r-- | src/zmq_init.cpp | 185 | 
1 files changed, 103 insertions, 82 deletions
| diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 5824f5c..cf65d69 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -1,50 +1,56 @@  /* -    Copyright (c) 2007-2010 iMatix Corporation +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file      This file is part of 0MQ.      0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by +    the terms of the GNU Lesser General Public License as published by      the Free Software Foundation; either version 3 of the License, or      (at your option) any later version.      0MQ is distributed in the hope that it will be useful,      but WITHOUT ANY WARRANTY; without even the implied warranty of      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. +    GNU Lesser General Public License for more details. -    You should have received a copy of the Lesser GNU General Public License +    You should have received a copy of the GNU Lesser General Public License      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */  #include <string.h>  #include "zmq_init.hpp" +#include "transient_session.hpp" +#include "named_session.hpp" +#include "socket_base.hpp"  #include "zmq_engine.hpp"  #include "io_thread.hpp"  #include "session.hpp"  #include "uuid.hpp" +#include "blob.hpp"  #include "err.hpp" -zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, -      fd_t fd_, const options_t &options_, bool reconnect_, -      const char *protocol_, const char *address_, uint64_t session_ordinal_) : -    owned_t (parent_, owner_), +zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, +      socket_base_t *socket_, session_t *session_, fd_t fd_, +      const options_t &options_) : +    own_t (io_thread_, options_), +    ephemeral_engine (NULL),      sent (false),      received (false), -    session_ordinal (session_ordinal_), -    options (options_) +    socket (socket_), +    session (session_), +    io_thread (io_thread_)  {      //  Create the engine object for this connection. -    engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, -        reconnect_, protocol_, address_); -    zmq_assert (engine); +    engine = new (std::nothrow) zmq_engine_t (fd_, options); +    alloc_assert (engine);  }  zmq::zmq_init_t::~zmq_init_t ()  {      if (engine) -        delete engine; +        engine->terminate ();  }  bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) @@ -60,9 +66,8 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)          options.identity.size ());      sent = true; -    //  If initialisation is done, pass the engine to the session and -    //  destroy the init object. -    finalise (); +    //  Try finalize initialization. +    finalise_initialisation ();      return true;  } @@ -85,9 +90,14 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)          peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),              zmq_msg_size (msg_));      } +    int rc = zmq_msg_close (msg_); +    zmq_assert (rc == 0);      received = true; +    //  Try finalize initialization. +    finalise_initialisation (); +      return true;  } @@ -97,46 +107,30 @@ void zmq::zmq_init_t::flush ()      if (!received)          return; -    //  If initialisation is done, pass the engine to the session and -    //  destroy the init object. -    finalise (); +    //  Initialization is done, dispatch engine. +    if (ephemeral_engine) +        dispatch_engine ();  } -void zmq::zmq_init_t::detach (owned_t *reconnecter_) +void zmq::zmq_init_t::detach ()  {      //  This function is called by engine when disconnection occurs. -    //  If required, launch the reconnecter. -    if (reconnecter_) { -        send_plug (reconnecter_); -        send_own (owner, reconnecter_); -    } +    //  If there is an associated session, send it a null engine to let it know +    //  that connection process was unsuccesful. +    if (session) +        send_attach (session, NULL, blob_t (), true);      //  The engine will destroy itself, so let's just drop the pointer here and      //  start termination of the init object.      engine = NULL; -    term (); -} - -zmq::io_thread_t *zmq::zmq_init_t::get_io_thread () -{ -    return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::zmq_init_t::get_owner () -{ -    return owner; -} - -uint64_t zmq::zmq_init_t::get_ordinal () -{ -    return session_ordinal; +    terminate ();  }  void zmq::zmq_init_t::process_plug ()  {      zmq_assert (engine); -    engine->plug (this); +    engine->plug (io_thread, this);  }  void zmq::zmq_init_t::process_unplug () @@ -145,51 +139,78 @@ void zmq::zmq_init_t::process_unplug ()          engine->unplug ();  } -void zmq::zmq_init_t::finalise () +void zmq::zmq_init_t::finalise_initialisation () +{ +     //  Unplug and prepare to dispatch engine. +     if (sent && received) { +        ephemeral_engine = engine; +        engine = NULL; +        ephemeral_engine->unplug (); +        return; +    } +} + +void zmq::zmq_init_t::dispatch_engine ()  {      if (sent && received) { -        //  Disconnect the engine from the init object. -        engine->unplug (); +        //  Engine must be detached. +        zmq_assert (!engine); +        zmq_assert (ephemeral_engine); + +        //  If we know what session we belong to, it's easy, just send the +        //  engine to that session and destroy the init object. Note that we +        //  know about the session only if this object is owned by it. Thus, +        //  lifetime of this object in contained in the lifetime of the session +        //  so the pointer cannot become invalid without notice. +        if (session) { +            send_attach (session, ephemeral_engine, peer_identity, true); +            terminate (); +            return; +        } -        session_t *session = NULL; -         -        //  If we have the session ordinal, let's use it to find the session. -        //  If it is not found, it means socket is already being shut down -        //  and the session have been deallocated. -        //  TODO: We should check whether the name of the peer haven't changed -        //  upon reconnection. -        if (session_ordinal) { -            session = owner->find_session (session_ordinal); -            if (!session) { -                term (); -                return; -            } +        //  All the cases below are listener-based. Therefore we need the socket +        //  reference so that new sessions can bind to that socket. +        zmq_assert (socket); + +        //  We have no associated session. If the peer has no identity we'll +        //  create a transient session for the connection. Note that +        //  seqnum is incremented to account for attach command before the +        //  session is launched. That way we are sure it won't terminate before +        //  being attached. +        if (peer_identity [0] == 0) { +            session = new (std::nothrow) transient_session_t (io_thread, +                socket, options); +            alloc_assert (session); +            session->inc_seqnum (); +            launch_sibling (session); +            send_attach (session, ephemeral_engine, peer_identity, false); +            terminate (); +            return;          } -        else { - -            //  If the peer has a unique name, find the associated session. -            //  If it does not exist, create it. -            zmq_assert (!peer_identity.empty ()); -            session = owner->find_session (peer_identity); -            if (!session) { -                session = new (std::nothrow) session_t ( -                    choose_io_thread (options.affinity), owner, options, -                    peer_identity); -                zmq_assert (session); -                send_plug (session); -                send_own (owner, session); - -                //  Reserve a sequence number for following 'attach' command. -                session->inc_seqnum (); -            } +         +        //  Try to find the session corresponding to the peer's identity. +        //  If found, send the engine to that session and destroy this object. +        //  Note that session's seqnum is incremented by find_session rather +        //  than by send_attach. +        session = socket->find_session (peer_identity); +        if (session) { +            send_attach (session, ephemeral_engine, peer_identity, false); +            terminate (); +            return;          } -        //  No need to increment seqnum as it was already incremented above. -        send_attach (session, engine, peer_identity, false); - -        //  Destroy the init object. -        engine = NULL; -        term (); +        //  There's no such named session. We have to create one. Note that +        //  seqnum is incremented to account for attach command before the +        //  session is launched. That way we are sure it won't terminate before +        //  being attached. +        session = new (std::nothrow) named_session_t (io_thread, socket, +            options, peer_identity); +        alloc_assert (session); +        session->inc_seqnum (); +        launch_sibling (session); +        send_attach (session, ephemeral_engine, peer_identity, false); +        terminate (); +        return;      }  } | 
