diff options
Diffstat (limited to 'src/zmq_init.cpp')
-rw-r--r-- | src/zmq_init.cpp | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp new file mode 100644 index 0000000..e526b34 --- /dev/null +++ b/src/zmq_init.cpp @@ -0,0 +1,195 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "zmq_init.hpp" +#include "zmq_engine.hpp" +#include "io_thread.hpp" +#include "session.hpp" +#include "err.hpp" + +zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, + fd_t fd_, const options_t &options_, bool reconnect_, + const char *address_, uint64_t session_ordinal_) : + owned_t (parent_, owner_), + sent (false), + received (false), + session_ordinal (session_ordinal_), + options (options_) +{ + // Create the engine object for this connection. + engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, + reconnect_, address_); + zmq_assert (engine); +} + +zmq::zmq_init_t::~zmq_init_t () +{ + if (engine) + delete engine; +} + +bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) +{ + // If the identity was already sent, do nothing. + if (sent) + return false; + + // Send the identity. + int rc = zmq_msg_init_size (msg_, options.identity.size ()); + zmq_assert (rc == 0); + memcpy (zmq_msg_data (msg_), options.identity.c_str (), + options.identity.size ()); + sent = true; + + // If initialisation is done, pass the engine to the session and + // destroy the init object. + finalise (); + + return true; +} + +bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) +{ + // If identity was already received, we are not interested + // in subsequent messages. + if (received) + return false; + + // Retreieve the remote identity. + peer_identity.assign ((const char*) zmq_msg_data (msg_), + zmq_msg_size (msg_)); + received = true; + + return true; +} + +void zmq::zmq_init_t::flush () +{ + // Check if there's anything to flush. + if (!received) + return; + + // If initialisation is done, pass the engine to the session and + // destroy the init object. + finalise (); +} + +void zmq::zmq_init_t::detach (owned_t *reconnecter_) +{ + // This function is called by engine when disconnection occurs. + + // If required, launch the reconnecter. + if (reconnecter_) { + send_plug (reconnecter_); + send_own (owner, reconnecter_); + } + + // The engine will destroy itself, so let's just drop the pointer here and + // start termination of the init object. + engine = NULL; + term (); +} + +zmq::io_thread_t *zmq::zmq_init_t::get_io_thread () +{ + return choose_io_thread (options.affinity); +} + +class zmq::socket_base_t *zmq::zmq_init_t::get_owner () +{ + return owner; +} + +uint64_t zmq::zmq_init_t::get_ordinal () +{ + zmq_assert (false); +} + +void zmq::zmq_init_t::process_plug () +{ + zmq_assert (engine); + engine->plug (this); +} + +void zmq::zmq_init_t::process_unplug () +{ + if (engine) + engine->unplug (); +} + +void zmq::zmq_init_t::finalise () +{ + if (sent && received) { + + // Disconnect the engine from the init object. + engine->unplug (); + + session_t *session = NULL; + + // If we have the session ordinal, let's use it to find the session. + // If it is not found, it means socket is already being shut down + // and the session have been deallocated. + // TODO: We should check whether the name of the peer haven't changed + // upon reconnection. + if (session_ordinal) { + session = owner->find_session (session_ordinal); + if (!session) { + term (); + return; + } + } + + // If the peer has a unique name, find the associated session. If it + // doesn't exist, create it. + else if (!peer_identity.empty ()) { + session = owner->find_session (peer_identity.c_str ()); + if (!session) { + session = new (std::nothrow) session_t ( + choose_io_thread (options.affinity), owner, options, + peer_identity.c_str ()); + zmq_assert (session); + send_plug (session); + send_own (owner, session); + + // Reserve a sequence number for following 'attach' command. + session->inc_seqnum (); + } + } + + // If the other party has no specific identity, let's create a + // transient session. + else { + session = new (std::nothrow) session_t ( + choose_io_thread (options.affinity), owner, options, NULL); + zmq_assert (session); + send_plug (session); + send_own (owner, session); + + // Reserve a sequence number for following 'attach' command. + session->inc_seqnum (); + } + + // No need to increment seqnum as it was laready incremented above. + send_attach (session, engine, false); + + // Destroy the init object. + engine = NULL; + term (); + } +} |