/* Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include #include "zmq_init.hpp" #include "transient_session.hpp" #include "named_session.hpp" #include "socket_base.hpp" #include "zmq_engine.hpp" #include "io_thread.hpp" #include "session.hpp" #include "uuid.hpp" #include "blob.hpp" #include "wire.hpp" #include "err.hpp" zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, socket_base_t *socket_, session_t *session_, fd_t fd_, const options_t &options_) : own_t (io_thread_, options_), ephemeral_engine (NULL), received (false), socket (socket_), session (session_), io_thread (io_thread_) { // Create the engine object for this connection. engine = new (std::nothrow) zmq_engine_t (fd_, options); alloc_assert (engine); // Generate an unique identity. peer_identity.resize (17); peer_identity [0] = 0; generate_uuid (&peer_identity [1]); // Create a list of props to send. msg_t msg; int rc = msg.init_size (4); errno_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); put_uint16 (data, prop_type); put_uint16 (data + 2, options.type); msg.set_flags (msg_t::more); to_send.push_back (msg); if (!options.identity.empty ()) { rc = msg.init_size (2 + options.identity.size ()); errno_assert (rc == 0); data = (unsigned char*) msg.data (); put_uint16 (data, prop_identity); memcpy (data + 2, options.identity.data (), options.identity.size ()); msg.set_flags (msg_t::more); to_send.push_back (msg); } // Remove the MORE flag from the last prop. to_send.back ().reset_flags (msg_t::more); } zmq::zmq_init_t::~zmq_init_t () { if (engine) engine->terminate (); // If there are unsent props still queued deallocate them. for (to_send_t::iterator it = to_send.begin (); it != to_send.end (); ++it) { int rc = it->close (); errno_assert (rc == 0); } to_send.clear (); } bool zmq::zmq_init_t::read (msg_t *msg_) { // If the identity was already sent, do nothing. if (to_send.empty ()) return false; // Pass next property to the engine. *msg_ = to_send.front (); to_send.erase (to_send.begin ()); // Try finalize initialization. finalise_initialisation (); return true; } bool zmq::zmq_init_t::write (msg_t *msg_) { // If identity was already received, we are not interested // in subsequent messages. if (received) return false; size_t size = msg_->size (); unsigned char *data = (unsigned char*) msg_->data (); // There should be at least property type in the message. zmq_assert (size >= 2); uint16_t prop = get_uint16 (data); switch (prop) { case prop_type: { zmq_assert (size == 4); // TODO: Check whether the type is OK. // uint16_t type = get_uint16 (data + 2); // ... break; }; case prop_identity: { peer_identity.assign (data + 2, size - 2); break; } default: zmq_assert (false); } if (!(msg_->flags () & msg_t::more)) { received = true; finalise_initialisation (); } return true; } void zmq::zmq_init_t::flush () { // Check if there's anything to flush. if (!received) return; // Initialization is done, dispatch engine. if (ephemeral_engine) dispatch_engine (); } void zmq::zmq_init_t::detach () { // This function is called by engine when disconnection occurs. // If there is an associated session, send it a null engine to let it know // that connection process was unsuccesful. if (session) send_attach (session, NULL, blob_t (), true); // The engine will destroy itself, so let's just drop the pointer here and // start termination of the init object. engine = NULL; terminate (); } void zmq::zmq_init_t::process_plug () { zmq_assert (engine); engine->plug (io_thread, this); } void zmq::zmq_init_t::process_unplug () { if (engine) engine->unplug (); } void zmq::zmq_init_t::finalise_initialisation () { // Unplug and prepare to dispatch engine. if (to_send.empty () && received) { ephemeral_engine = engine; engine = NULL; ephemeral_engine->unplug (); return; } } void zmq::zmq_init_t::dispatch_engine () { if (to_send.empty () && received) { // Engine must be detached. zmq_assert (!engine); zmq_assert (ephemeral_engine); // If we know what session we belong to, it's easy, just send the // engine to that session and destroy the init object. Note that we // know about the session only if this object is owned by it. Thus, // lifetime of this object in contained in the lifetime of the session // so the pointer cannot become invalid without notice. if (session) { send_attach (session, ephemeral_engine, peer_identity, true); terminate (); return; } // All the cases below are listener-based. Therefore we need the socket // reference so that new sessions can bind to that socket. zmq_assert (socket); // We have no associated session. If the peer has no identity we'll // create a transient session for the connection. Note that // seqnum is incremented to account for attach command before the // session is launched. That way we are sure it won't terminate before // being attached. if (peer_identity [0] == 0) { session = new (std::nothrow) transient_session_t (io_thread, socket, options); alloc_assert (session); session->inc_seqnum (); launch_sibling (session); send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } // Try to find the session corresponding to the peer's identity. // If found, send the engine to that session and destroy this object. // Note that session's seqnum is incremented by find_session rather // than by send_attach. session = socket->find_session (peer_identity); if (session) { send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } // There's no such named session. We have to create one. Note that // seqnum is incremented to account for attach command before the // session is launched. That way we are sure it won't terminate before // being attached. session = new (std::nothrow) named_session_t (io_thread, socket, options, peer_identity); alloc_assert (session); session->inc_seqnum (); launch_sibling (session); send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } }