From e645fc2693acc796304498909786b7b47005b429 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Mon, 23 Jan 2012 08:53:35 +0100 Subject: Imported Upstream version 2.1.3 --- src/session.cpp | 360 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 208 insertions(+), 152 deletions(-) (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp index f798877..33c25d9 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -1,77 +1,88 @@ /* - Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ -#include - #include "session.hpp" +#include "socket_base.hpp" #include "i_engine.hpp" #include "err.hpp" #include "pipe.hpp" +#include "likely.hpp" -zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_) : - owned_t (parent_, owner_), +zmq::session_t::session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_) : + own_t (io_thread_, options_), + io_object_t (io_thread_), in_pipe (NULL), incomplete_in (false), - active (true), out_pipe (NULL), engine (NULL), - options (options_) + socket (socket_), + io_thread (io_thread_), + pipes_attached (false), + delimiter_processed (false), + force_terminate (false), + has_linger_timer (false), + state (active) { - // It's possible to register the session at this point as it will be - // searched for only on reconnect, i.e. no race condition (session found - // before it is plugged into it's I/O thread) is possible. - ordinal = owner->register_session (this); } -zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const blob_t &peer_identity_) : - owned_t (parent_, owner_), - in_pipe (NULL), - incomplete_in (false), - active (true), - out_pipe (NULL), - engine (NULL), - ordinal (0), - peer_identity (peer_identity_), - options (options_) +zmq::session_t::~session_t () { - if (!peer_identity.empty () && peer_identity [0] != 0) { - if (!owner->register_session (peer_identity, this)) { + zmq_assert (!in_pipe); + zmq_assert (!out_pipe); - // TODO: There's already a session with the specified - // identity. We should presumably syslog it and drop the - // session. - zmq_assert (false); - } - } + if (engine) + engine->terminate (); } -zmq::session_t::~session_t () +void zmq::session_t::proceed_with_term () { - zmq_assert (!in_pipe); - zmq_assert (!out_pipe); + if (state == terminating) + return; + + zmq_assert (state == pending); + state = terminating; + + // If there's still a pending linger timer, remove it. + if (has_linger_timer) { + cancel_timer (linger_timer_id); + has_linger_timer = false; + } + + if (in_pipe) { + register_term_acks (1); + in_pipe->terminate (); + } + if (out_pipe) { + register_term_acks (1); + out_pipe->terminate (); + } + + // The session has already waited for the linger period. We don't want + // the child objects to linger any more thus linger is set to zero. + own_t::process_term (0); } bool zmq::session_t::read (::zmq_msg_t *msg_) { - if (!in_pipe || !active) + if (!in_pipe) return false; if (!in_pipe->read (msg_)) @@ -97,17 +108,8 @@ void zmq::session_t::flush () out_pipe->flush (); } -void zmq::session_t::detach (owned_t *reconnecter_) +void zmq::session_t::clean_pipes () { - // Plug in the reconnecter object if any. - if (reconnecter_) { - send_plug (reconnecter_); - send_own (owner, reconnecter_); - } - - // Engine is terminating itself. No need to deallocate it from here. - engine = NULL; - // Get rid of half-processed messages in the out pipe. Flush any // unflushed messages upstream. if (out_pipe) { @@ -127,165 +129,219 @@ void zmq::session_t::detach (owned_t *reconnecter_) zmq_msg_close (&msg); } } - - // Terminate transient session. - if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0)) - term (); -} - -zmq::io_thread_t *zmq::session_t::get_io_thread () -{ - return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::session_t::get_owner () -{ - return owner; -} - -uint64_t zmq::session_t::get_ordinal () -{ - zmq_assert (ordinal); - return ordinal; } void zmq::session_t::attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { + zmq_assert (!pipes_attached); + pipes_attached = true; + if (inpipe_) { zmq_assert (!in_pipe); in_pipe = inpipe_; - active = true; - in_pipe->set_endpoint (this); + in_pipe->set_event_sink (this); } if (outpipe_) { zmq_assert (!out_pipe); out_pipe = outpipe_; - out_pipe->set_endpoint (this); + out_pipe->set_event_sink (this); + } + + // If we are already terminating, terminate the pipes straight away. + if (state == terminating) { + if (in_pipe) { + in_pipe->terminate (); + register_term_acks (1); + } + if (out_pipe) { + out_pipe->terminate (); + register_term_acks (1); + } } } -void zmq::session_t::detach_inpipe (reader_t *pipe_) +void zmq::session_t::delimited (reader_t *pipe_) { - active = false; - in_pipe = NULL; + zmq_assert (in_pipe == pipe_); + zmq_assert (!delimiter_processed); + delimiter_processed = true; + + // If we are in process of being closed, but still waiting for all + // pending messeges being sent, we can terminate here. + if (state == pending) + proceed_with_term (); } -void zmq::session_t::detach_outpipe (writer_t *pipe_) +void zmq::session_t::terminated (reader_t *pipe_) { - out_pipe = NULL; + zmq_assert (in_pipe == pipe_); + in_pipe = NULL; + if (state == terminating) + unregister_term_ack (); } -void zmq::session_t::kill (reader_t *pipe_) +void zmq::session_t::terminated (writer_t *pipe_) { - active = false; + zmq_assert (out_pipe == pipe_); + out_pipe = NULL; + if (state == terminating) + unregister_term_ack (); } -void zmq::session_t::revive (reader_t *pipe_) +void zmq::session_t::activated (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); - active = true; - if (engine) - engine->revive (); + + if (likely (engine != NULL)) + engine->activate_out (); + else + in_pipe->check_read (); } -void zmq::session_t::revive (writer_t *pipe_) +void zmq::session_t::activated (writer_t *pipe_) { zmq_assert (out_pipe == pipe_); if (engine) - engine->resume_input (); + engine->activate_in (); } void zmq::session_t::process_plug () { } -void zmq::session_t::process_unplug () +void zmq::session_t::process_attach (i_engine *engine_, + const blob_t &peer_identity_) { - // Unregister the session from the socket. - if (ordinal) - owner->unregister_session (ordinal); - else if (!peer_identity.empty () && peer_identity [0] != 0) - owner->unregister_session (peer_identity); - - // Ask associated pipes to terminate. - if (in_pipe) { - in_pipe->term (); - in_pipe = NULL; + // If some other object (e.g. init) notifies us that the connection failed + // we need to start the reconnection process. + if (!engine_) { + zmq_assert (!engine); + detached (); + return; } - if (out_pipe) { - out_pipe->term (); - out_pipe = NULL; + + // If we are already terminating, we destroy the engine straight away. + // Note that we don't have to unplug it before deleting as it's not + // yet plugged to the session. + if (state == terminating) { + delete engine_; + return; } + // If the session already has an engine attached, destroy new one. + // Note new engine is not plugged in yet, we don't have to unplug it. if (engine) { - engine->unplug (); - delete engine; - engine = NULL; + log ("DPID: duplicate peer identity - disconnecting peer"); + delete engine_; + return; + } + + // Check whether the required pipes already exist. If not so, we'll + // create them and bind them to the socket object. + if (!pipes_attached) { + zmq_assert (!in_pipe && !out_pipe); + pipes_attached = true; + reader_t *socket_reader = NULL; + writer_t *socket_writer = NULL; + + // Create the pipes, as required. + if (options.requires_in) { + create_pipe (socket, this, options.hwm, options.swap, &socket_reader, + &out_pipe); + out_pipe->set_event_sink (this); + } + if (options.requires_out) { + create_pipe (this, socket, options.hwm, options.swap, &in_pipe, + &socket_writer); + in_pipe->set_event_sink (this); + } + + // Bind the pipes to the socket object. + if (socket_reader || socket_writer) + send_bind (socket, socket_reader, socket_writer, peer_identity_); } + + // Plug in the engine. + engine = engine_; + engine->plug (io_thread, this); + + // Trigger the notfication about the attachment. + attached (peer_identity_); } -void zmq::session_t::process_attach (i_engine *engine_, - const blob_t &peer_identity_) +void zmq::session_t::detach () { - if (!peer_identity.empty ()) { - - // If both IDs are temporary, no checking is needed. - // TODO: Old ID should be reused in this case... - if (peer_identity.empty () || peer_identity [0] != 0 || - peer_identity_.empty () || peer_identity_ [0] != 0) { + // Engine is dead. Let's forget about it. + engine = NULL; - // If we already know the peer name do nothing, just check whether - // it haven't changed. - zmq_assert (peer_identity == peer_identity_); - } - } - else if (!peer_identity_.empty ()) { + // Remove any half-done messages from the pipes. + clean_pipes (); - // Store the peer identity. - peer_identity = peer_identity_; + // Send the event to the derived class. + detached (); - // If the session is not registered with the ordinal, let's register - // it using the peer name. - if (!ordinal) { - if (!owner->register_session (peer_identity, this)) { + // Just in case, there's only a delimiter in the inbound pipe. + if (in_pipe) + in_pipe->check_read (); +} - // TODO: There's already a session with the specified - // identity. We should presumably syslog it and drop the - // session. - zmq_assert (false); - } - } +void zmq::session_t::process_term (int linger_) +{ + zmq_assert (state == active); + state = pending; + + // If linger is set to zero, we can terminate the session straight away + // not waiting for the pending messages to be sent. + if (linger_ == 0) { + proceed_with_term (); + return; } - // Check whether the required pipes already exist. If not so, we'll - // create them and bind them to the socket object. - reader_t *socket_reader = NULL; - writer_t *socket_writer = NULL; - - if (options.requires_in && !out_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap); - zmq_assert (pipe); - out_pipe = &pipe->writer; - out_pipe->set_endpoint (this); - socket_reader = &pipe->reader; + // If there's finite linger value, set up a timer. + if (linger_ > 0) { + zmq_assert (!has_linger_timer); + add_timer (linger_, linger_timer_id); + has_linger_timer = true; } - if (options.requires_out && !in_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap); - zmq_assert (pipe); - in_pipe = &pipe->reader; - in_pipe->set_endpoint (this); - socket_writer = &pipe->writer; - } + // If there's no engine and there's only delimiter in the pipe it wouldn't + // be ever read. Thus we check for it explicitly. + if (in_pipe) + in_pipe->check_read (); + + // If there's no in pipe there are no pending messages to send. + // We can proceed with the shutdown straight away. Also, if there is + // inbound pipe, but the delimiter was already processed, we can + // terminate immediately. Alternatively, if the derived session type have + // called 'terminate' we'll finish straight away. + if (!options.requires_out || delimiter_processed || force_terminate || + (!options.immediate_connect && !in_pipe)) + proceed_with_term (); +} - if (socket_reader || socket_writer) - send_bind (owner, socket_reader, socket_writer, peer_identity); +void zmq::session_t::timer_event (int id_) +{ + // Linger period expired. We can proceed with termination even though + // there are still pending messages to be sent. + zmq_assert (id_ == linger_timer_id); + has_linger_timer = false; + proceed_with_term (); +} - // Plug in the engine. - zmq_assert (!engine); - zmq_assert (engine_); - engine = engine_; - engine->plug (this); +bool zmq::session_t::register_session (const blob_t &name_, session_t *session_) +{ + return socket->register_session (name_, session_); +} + +void zmq::session_t::unregister_session (const blob_t &name_) +{ + socket->unregister_session (name_); +} + +void zmq::session_t::terminate () +{ + force_terminate = true; + own_t::terminate (); } -- cgit v1.2.3