diff options
Diffstat (limited to 'src/session.cpp')
-rw-r--r-- | src/session.cpp | 273 |
1 files changed, 0 insertions, 273 deletions
diff --git a/src/session.cpp b/src/session.cpp deleted file mode 100644 index b9a450d..0000000 --- a/src/session.cpp +++ /dev/null @@ -1,273 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "../include/zmq.h" - -#include "session.hpp" -#include "i_engine.hpp" -#include "i_thread.hpp" -#include "i_mux.hpp" -#include "i_demux.hpp" -#include "err.hpp" -#include "pipe.hpp" -#include "pipe_reader.hpp" -#include "pipe_writer.hpp" -#include "simple_semaphore.hpp" - -zmq::session_t::session_t (object_t *parent_, i_thread *thread_, - i_mux *mux_, i_demux *demux_, - bool terminate_on_disconnect_, bool terminate_on_no_pipes_) : - safe_object_t (parent_), - mux (mux_), - demux (demux_), - thread (thread_), - engine (NULL), - terminate_on_disconnect (terminate_on_disconnect_), - terminate_on_no_pipes (false), - terminate_on_no_pipes_delayed (terminate_on_no_pipes_), - index (-1) -{ - // At least one way to terminate the session should be allowed. Otherwise - // the session can be orphaned forever. - zmq_assert (terminate_on_disconnect || terminate_on_no_pipes_delayed); - - // Give the mux and the demux callback pointer to ourselves. - if (mux) - mux->set_session (this); - if (demux) - demux->set_session (this); -} - -void zmq::session_t::shutdown () -{ - // Session may live even without an associated engine, thus we have - // to check if for NULL value. - if (engine) - engine->shutdown (); - - // Propagate the shutdown signal to both inbound and outbound pipes. - if (mux) - mux->shutdown (); - if (demux) - demux->shutdown (); - - delete this; -} - -void zmq::session_t::disconnected () -{ - // It's engine who calls this function so there's no need to deallocate - // the engine. Just drop the reference. - engine = NULL; - - // Some sessions won't shut down because of disconnect. New engine will - // attached to the session later on. - if (!terminate_on_disconnect) - return; - - terminate (); -} - -void zmq::session_t::bind (object_t *peer_, bool in_, bool out_) -{ - // Create the out pipe (if required). - pipe_reader_t *pipe_reader = NULL; - if (out_) { - pipe_writer_t *pipe_writer; - create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer); - demux->attach_pipe (pipe_writer); - - // There's at least one pipe attached. We can deallocate the object - // when there are no pipes (if required). - terminate_on_no_pipes = terminate_on_no_pipes_delayed; - } - - // Ask peer to attach to the out pipe (if one exists). If required, ask - // it to create a pipe in opposite direction. It's assumed that peer's - // seqnum was already incremented, so we don't need to care whether it's - // alive at the moment. - if (in_) - inc_seqnum (); - send_bind (peer_, pipe_reader, in_ ? this : NULL); -} - -void zmq::session_t::revive () -{ - if (engine) - engine->revive (); -} - -void zmq::session_t::terminate () -{ - // Terminate is always called by engine, thus it'll terminate itself, - // we just have to drop the pointer. - engine = NULL; - - // Propagate the terminate signal to both inbound and outbound pipes. - if (mux) { - mux->terminate (); - mux = NULL; - } - if (demux) { - demux->terminate (); - demux = NULL; - } - - // Session cannot be deallocated at this point. There can still be - // pending commands to process. Unregister session from global - // repository thus ensuring that no new commands will be sent. - unregister_inproc_endpoints (this); - - // Move to terminating state. - safe_object_t::terminate (); -} - -zmq::session_t::~session_t () -{ - // When session is actually deallocated it unregisters from its thread. - // Unregistration cannot be done earlier as it would result in memory - // leak if global shutdown happens in the middle of session termination. - thread->detach_session (this); -} - -void zmq::session_t::set_engine (i_engine *engine_) -{ - zmq_assert (!engine || !engine_); - engine = engine_; -} - -void zmq::session_t::set_index (int index_) -{ - index = index_; -} - -int zmq::session_t::get_index () -{ - return index; -} - -bool zmq::session_t::write (zmq_msg *msg_) -{ - return demux->send (msg_); -} - -void zmq::session_t::flush () -{ - demux->flush (); -} - -bool zmq::session_t::read (zmq_msg *msg_) -{ - bool retrieved = mux->recv (msg_); - if (terminate_on_no_pipes && mux->empty () && demux->empty ()) { - zmq_assert (engine); - engine->schedule_terminate (); - terminate (); - } - return retrieved; -} - -void zmq::session_t::process_bind (pipe_reader_t *reader_, session_t *peer_) -{ - if (is_terminating ()) { - - // If session is already in termination phase, we'll ask newly arrived - // pipe reader & writer to terminate straight away. - if (reader_) - reader_->terminate (); - - // Peer session has already incremented its seqnum. We have to send - // a dummy command to avoid a memory leak. - if (peer_) - send_bind (peer_, NULL, NULL); - - return; - } - - // If inbound pipe is provided, bind it to the mux. - if (reader_) { - mux->attach_pipe (reader_); - - // There's at least one pipe attached. We can deallocate the object - // when there are no pipes (if required). - terminate_on_no_pipes = terminate_on_no_pipes_delayed; - } - - // If peer wants to get messages from ourselves, we'll bind to it. - if (peer_) { - pipe_reader_t *pipe_reader; - pipe_writer_t *pipe_writer; - create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer); - demux->attach_pipe (pipe_writer); - send_bind (peer_, pipe_reader, NULL); - - // There's at least one pipe attached. We can deallocate the object - // when there are no pipes (if required). - terminate_on_no_pipes = terminate_on_no_pipes_delayed; - } -} - -void zmq::session_t::process_reg (simple_semaphore_t *smph_) -{ - zmq_assert (!is_terminating ()); - - // Add the session to the list of sessions associated with this I/O thread. - // This way the session will be deallocated on the terminal shutdown. - thread->attach_session (this); - - // Release calling thead (if required). - if (smph_) - smph_->post (); -} - -void zmq::session_t::process_reg_and_bind (session_t *peer_, - bool flow_in_, bool flow_out_) -{ - zmq_assert (!is_terminating ()); - - // Add the session to the list of sessions associated with this I/O thread. - // This way the session will be deallocated on the terminal shutdown. - thread->attach_session (this); - - // Bind to the peer. Note that caller have already incremented command - // sequence number of the peer so we are sure it still exists. - pipe_reader_t *pipe_reader = NULL; - if (flow_out_) { - pipe_writer_t *pipe_writer; - create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer); - demux->attach_pipe (pipe_writer); - - // There's at least one pipe attached. We can deallocate the object - // when there are no pipes (if required). - terminate_on_no_pipes = terminate_on_no_pipes_delayed; - } - send_bind (peer_, pipe_reader, flow_in_ ? this : NULL); -} - -void zmq::session_t::process_engine (i_engine *engine_) -{ - if (is_terminating ()) { - - // Kill the engine. It won't be needed anymore. - engine_->terminate (); - return; - } - - engine_->attach (thread->get_poller (), this); -} |