From f78d9b6bfca13e298c29fadabbbc870b37a0a573 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 15 Sep 2011 10:00:23 +0200 Subject: Session class separated into socket-type-specific sessions This is a preliminary patch allowing for socket-type-specific functionality in the I/O thread. For example, message format can be checked asynchronously and misbehaved connections dropped straight away. Signed-off-by: Martin Sustrik --- src/session.cpp | 383 -------------------------------------------------------- 1 file changed, 383 deletions(-) delete mode 100644 src/session.cpp (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp deleted file mode 100644 index 8001ba8..0000000 --- a/src/session.cpp +++ /dev/null @@ -1,383 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "session.hpp" -#include "socket_base.hpp" -#include "i_engine.hpp" -#include "err.hpp" -#include "pipe.hpp" -#include "likely.hpp" -#include "tcp_connecter.hpp" -#include "ipc_connecter.hpp" -#include "vtcp_connecter.hpp" -#include "pgm_sender.hpp" -#include "pgm_receiver.hpp" - -zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - connect (connect_), - pipe (NULL), - incomplete_in (false), - pending (false), - engine (NULL), - socket (socket_), - io_thread (io_thread_), - has_linger_timer (false) -{ - if (protocol_) - protocol = protocol_; - if (address_) - address = address_; -} - -zmq::session_t::~session_t () -{ - zmq_assert (!pipe); - - // If there's still a pending linger timer, remove it. - if (has_linger_timer) { - cancel_timer (linger_timer_id); - has_linger_timer = false; - } - - // Close the engine. - if (engine) - engine->terminate (); -} - -void zmq::session_t::attach_pipe (pipe_t *pipe_) -{ - zmq_assert (!is_terminating ()); - zmq_assert (!pipe); - zmq_assert (pipe_); - pipe = pipe_; - pipe->set_event_sink (this); -} - -bool zmq::session_t::read (msg_t *msg_) -{ - if (!pipe) - return false; - - if (!pipe->read (msg_)) - return false; - - incomplete_in = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - return true; -} - -bool zmq::session_t::write (msg_t *msg_) -{ - if (pipe && pipe->write (msg_)) { - int rc = msg_->init (); - errno_assert (rc == 0); - return true; - } - - return false; -} - -void zmq::session_t::flush () -{ - if (pipe) - pipe->flush (); -} - -void zmq::session_t::clean_pipes () -{ - if (pipe) { - - // Get rid of half-processed messages in the out pipe. Flush any - // unflushed messages upstream. - pipe->rollback (); - pipe->flush (); - - // Remove any half-read message from the in pipe. - while (incomplete_in) { - msg_t msg; - int rc = msg.init (); - errno_assert (rc == 0); - if (!read (&msg)) { - zmq_assert (!incomplete_in); - break; - } - rc = msg.close (); - errno_assert (rc == 0); - } - } -} - -void zmq::session_t::terminated (pipe_t *pipe_) -{ - // Drop the reference to the deallocated pipe. - zmq_assert (pipe == pipe_); - pipe = NULL; - - // If we are waiting for pending messages to be sent, at this point - // we are sure that there will be no more messages and we can proceed - // with termination safely. - if (pending) - proceed_with_term (); -} - -void zmq::session_t::read_activated (pipe_t *pipe_) -{ - zmq_assert (pipe == pipe_); - - if (likely (engine != NULL)) - engine->activate_out (); - else - pipe->check_read (); -} - -void zmq::session_t::write_activated (pipe_t *pipe_) -{ - zmq_assert (pipe == pipe_); - - if (engine) - engine->activate_in (); -} - -void zmq::session_t::hiccuped (pipe_t *pipe_) -{ - // Hiccups are always sent from session to socket, not the other - // way round. - zmq_assert (false); -} - -void zmq::session_t::process_plug () -{ - if (connect) - start_connecting (false); -} - -void zmq::session_t::process_attach (i_engine *engine_) -{ - // If some other object (e.g. init) notifies us that the connection failed - // without creating an engine we need to start the reconnection process. - if (!engine_) { - zmq_assert (!engine); - detached (); - return; - } - - // Create the pipe if it does not exist yet. - if (!pipe && !is_terminating ()) { - object_t *parents [2] = {this, socket}; - pipe_t *pipes [2] = {NULL, NULL}; - int hwms [2] = {options.rcvhwm, options.sndhwm}; - bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; - int rc = pipepair (parents, pipes, hwms, delays); - errno_assert (rc == 0); - - // Plug the local end of the pipe. - pipes [0]->set_event_sink (this); - - // Remember the local end of the pipe. - zmq_assert (!pipe); - pipe = pipes [0]; - - // Ask socket to plug into the remote end of the pipe. - send_bind (socket, pipes [1]); - } - - // Plug in the engine. - zmq_assert (!engine); - engine = engine_; - engine->plug (io_thread, this); -} - -void zmq::session_t::detach () -{ - // Engine is dead. Let's forget about it. - engine = NULL; - - // Remove any half-done messages from the pipes. - clean_pipes (); - - // Send the event to the derived class. - detached (); - - // Just in case there's only a delimiter in the pipe. - if (pipe) - pipe->check_read (); -} - -void zmq::session_t::process_term (int linger_) -{ - zmq_assert (!pending); - - // If the termination of the pipe happens before the term command is - // delivered there's nothing much to do. We can proceed with the - // stadard termination immediately. - if (!pipe) { - proceed_with_term (); - return; - } - - pending = true; - - // If there's finite linger value, delay the termination. - // If linger is infinite (negative) we don't even have to set - // the timer. - if (linger_ > 0) { - zmq_assert (!has_linger_timer); - add_timer (linger_, linger_timer_id); - has_linger_timer = true; - } - - // Start pipe termination process. Delay the termination till all messages - // are processed in case the linger time is non-zero. - pipe->terminate (linger_ != 0); - - // TODO: Should this go into pipe_t::terminate ? - // In case there's no engine and there's only delimiter in the - // pipe it wouldn't be ever read. Thus we check for it explicitly. - pipe->check_read (); -} - -void zmq::session_t::proceed_with_term () -{ - // The pending phase have just ended. - pending = false; - - // Continue with standard termination. - own_t::process_term (0); -} - -void zmq::session_t::timer_event (int id_) -{ - // Linger period expired. We can proceed with termination even though - // there are still pending messages to be sent. - zmq_assert (id_ == linger_timer_id); - has_linger_timer = false; - - // Ask pipe to terminate even though there may be pending messages in it. - zmq_assert (pipe); - pipe->terminate (false); -} - -void zmq::session_t::detached () -{ - // Transient session self-destructs after peer disconnects. - if (!connect) { - terminate (); - return; - } - - // Reconnect. - start_connecting (true); - - // For subscriber sockets we hiccup the inbound pipe, which will cause - // the socket object to resend all the subscriptions. - if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) - pipe->hiccup (); -} - -void zmq::session_t::start_connecting (bool wait_) -{ - zmq_assert (connect); - - // Choose I/O thread to run connecter in. Given that we are already - // running in an I/O thread, there must be at least one available. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); - - // Create the connecter object. - - if (protocol == "tcp") { - tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t ( - io_thread, this, options, address.c_str (), wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } - -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS - if (protocol == "ipc") { - ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t ( - io_thread, this, options, address.c_str (), wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } -#endif - -#if defined ZMQ_HAVE_VTCP - if (protocol == "vtcp") { - - vtcp_connecter_t *connecter = new (std::nothrow) vtcp_connecter_t ( - io_thread, this, options, address.c_str (), - wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } -#endif - -#if defined ZMQ_HAVE_OPENPGM - - // Both PGM and EPGM transports are using the same infrastructure. - if (protocol == "pgm" || protocol == "epgm") { - - // For EPGM transport with UDP encapsulation of PGM is used. - bool udp_encapsulation = (protocol == "epgm"); - - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with PGM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - - // PGM sender. - pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( - io_thread, options); - alloc_assert (pgm_sender); - - int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); - zmq_assert (rc == 0); - - send_attach (this, pgm_sender); - } - else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { - - // PGM receiver. - pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( - io_thread, options); - alloc_assert (pgm_receiver); - - int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); - zmq_assert (rc == 0); - - send_attach (this, pgm_receiver); - } - else - zmq_assert (false); - - return; - } -#endif - - zmq_assert (false); -} - -- cgit v1.2.3