diff options
Diffstat (limited to 'src/zmq_init.cpp')
-rw-r--r-- | src/zmq_init.cpp | 150 |
1 files changed, 79 insertions, 71 deletions
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 5824f5c..b5c5c86 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -20,31 +20,35 @@ #include <string.h> #include "zmq_init.hpp" +#include "transient_session.hpp" +#include "named_session.hpp" +#include "socket_base.hpp" #include "zmq_engine.hpp" #include "io_thread.hpp" #include "session.hpp" #include "uuid.hpp" +#include "blob.hpp" #include "err.hpp" -zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, - fd_t fd_, const options_t &options_, bool reconnect_, - const char *protocol_, const char *address_, uint64_t session_ordinal_) : - owned_t (parent_, owner_), +zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, + socket_base_t *socket_, session_t *session_, fd_t fd_, + const options_t &options_) : + own_t (io_thread_, options_), sent (false), received (false), - session_ordinal (session_ordinal_), - options (options_) + socket (socket_), + session (session_), + io_thread (io_thread_) { // Create the engine object for this connection. - engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, - reconnect_, protocol_, address_); + engine = new (std::nothrow) zmq_engine_t (fd_, options); zmq_assert (engine); } zmq::zmq_init_t::~zmq_init_t () { if (engine) - delete engine; + engine->terminate (); } bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) @@ -62,7 +66,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) // If initialisation is done, pass the engine to the session and // destroy the init object. - finalise (); + finalise_initialisation (); return true; } @@ -99,44 +103,28 @@ void zmq::zmq_init_t::flush () // If initialisation is done, pass the engine to the session and // destroy the init object. - finalise (); + finalise_initialisation (); } -void zmq::zmq_init_t::detach (owned_t *reconnecter_) +void zmq::zmq_init_t::detach () { // This function is called by engine when disconnection occurs. - // If required, launch the reconnecter. - if (reconnecter_) { - send_plug (reconnecter_); - send_own (owner, reconnecter_); - } + // If there is an associated session, send it a null engine to let it know + // that connection process was unsuccesful. + if (session) + send_attach (session, NULL, blob_t (), true); // The engine will destroy itself, so let's just drop the pointer here and // start termination of the init object. engine = NULL; - term (); -} - -zmq::io_thread_t *zmq::zmq_init_t::get_io_thread () -{ - return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::zmq_init_t::get_owner () -{ - return owner; -} - -uint64_t zmq::zmq_init_t::get_ordinal () -{ - return session_ordinal; + terminate (); } void zmq::zmq_init_t::process_plug () { zmq_assert (engine); - engine->plug (this); + engine->plug (io_thread, this); } void zmq::zmq_init_t::process_unplug () @@ -145,51 +133,71 @@ void zmq::zmq_init_t::process_unplug () engine->unplug (); } -void zmq::zmq_init_t::finalise () +void zmq::zmq_init_t::finalise_initialisation () { if (sent && received) { - // Disconnect the engine from the init object. - engine->unplug (); + // If we know what session we belong to, it's easy, just send the + // engine to that session and destroy the init object. Note that we + // know about the session only if this object is owned by it. Thus, + // lifetime of this object in contained in the lifetime of the session + // so the pointer cannot become invalid without notice. + if (session) { + engine->unplug (); + send_attach (session, engine, peer_identity, true); + engine = NULL; + terminate (); + return; + } - session_t *session = NULL; - - // If we have the session ordinal, let's use it to find the session. - // If it is not found, it means socket is already being shut down - // and the session have been deallocated. - // TODO: We should check whether the name of the peer haven't changed - // upon reconnection. - if (session_ordinal) { - session = owner->find_session (session_ordinal); - if (!session) { - term (); - return; - } + // All the cases below are listener-based. Therefore we need the socket + // reference so that new sessions can bind to that socket. + zmq_assert (socket); + + // We have no associated session. If the peer has no identity we'll + // create a transient session for the connection. Note that + // seqnum is incremented to account for attach command before the + // session is launched. That way we are sure it won't terminate before + // being attached. + if (peer_identity [0] == 0) { + session = new (std::nothrow) transient_session_t (io_thread, + socket, options); + zmq_assert (session); + session->inc_seqnum (); + launch_sibling (session); + engine->unplug (); + send_attach (session, engine, peer_identity, false); + engine = NULL; + terminate (); + return; } - else { - - // If the peer has a unique name, find the associated session. - // If it does not exist, create it. - zmq_assert (!peer_identity.empty ()); - session = owner->find_session (peer_identity); - if (!session) { - session = new (std::nothrow) session_t ( - choose_io_thread (options.affinity), owner, options, - peer_identity); - zmq_assert (session); - send_plug (session); - send_own (owner, session); - - // Reserve a sequence number for following 'attach' command. - session->inc_seqnum (); - } + + // Try to find the session corresponding to the peer's identity. + // If found, send the engine to that session and destroy this object. + // Note that session's seqnum is incremented by find_session rather + // than by send_attach. + session = socket->find_session (peer_identity); + if (session) { + engine->unplug (); + send_attach (session, engine, peer_identity, false); + engine = NULL; + terminate (); + return; } - // No need to increment seqnum as it was already incremented above. + // There's no such named session. We have to create one. Note that + // seqnum is incremented to account for attach command before the + // session is launched. That way we are sure it won't terminate before + // being attached. + session = new (std::nothrow) named_session_t (io_thread, socket, + options, peer_identity); + zmq_assert (session); + session->inc_seqnum (); + launch_sibling (session); + engine->unplug (); send_attach (session, engine, peer_identity, false); - - // Destroy the init object. engine = NULL; - term (); + terminate (); + return; } } |