summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp273
1 files changed, 273 insertions, 0 deletions
diff --git a/src/session.cpp b/src/session.cpp
new file mode 100644
index 0000000..63868b2
--- /dev/null
+++ b/src/session.cpp
@@ -0,0 +1,273 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../include/zs.h"
+
+#include "session.hpp"
+#include "i_engine.hpp"
+#include "i_thread.hpp"
+#include "i_mux.hpp"
+#include "i_demux.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+#include "pipe_reader.hpp"
+#include "pipe_writer.hpp"
+#include "simple_semaphore.hpp"
+
+zs::session_t::session_t (object_t *parent_, i_thread *thread_,
+ i_mux *mux_, i_demux *demux_,
+ bool terminate_on_disconnect_, bool terminate_on_no_pipes_) :
+ safe_object_t (parent_),
+ mux (mux_),
+ demux (demux_),
+ thread (thread_),
+ engine (NULL),
+ terminate_on_disconnect (terminate_on_disconnect_),
+ terminate_on_no_pipes (false),
+ terminate_on_no_pipes_delayed (terminate_on_no_pipes_),
+ index (-1)
+{
+ // At least one way to terminate the session should be allowed. Otherwise
+ // the session can be orphaned forever.
+ zs_assert (terminate_on_disconnect || terminate_on_no_pipes_delayed);
+
+ // Give the mux and the demux callback pointer to ourselves.
+ if (mux)
+ mux->set_session (this);
+ if (demux)
+ demux->set_session (this);
+}
+
+void zs::session_t::shutdown ()
+{
+ // Session may live even without an associated engine, thus we have
+ // to check if for NULL value.
+ if (engine)
+ engine->shutdown ();
+
+ // Propagate the shutdown signal to both inbound and outbound pipes.
+ if (mux)
+ mux->shutdown ();
+ if (demux)
+ demux->shutdown ();
+
+ delete this;
+}
+
+void zs::session_t::disconnected ()
+{
+ // It's engine who calls this function so there's no need to deallocate
+ // the engine. Just drop the reference.
+ engine = NULL;
+
+ // Some sessions won't shut down because of disconnect. New engine will
+ // attached to the session later on.
+ if (!terminate_on_disconnect)
+ return;
+
+ terminate ();
+}
+
+void zs::session_t::bind (object_t *peer_, bool in_, bool out_)
+{
+ // Create the out pipe (if required).
+ pipe_reader_t *pipe_reader = NULL;
+ if (out_) {
+ pipe_writer_t *pipe_writer;
+ create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer);
+ demux->attach_pipe (pipe_writer);
+
+ // There's at least one pipe attached. We can deallocate the object
+ // when there are no pipes (if required).
+ terminate_on_no_pipes = terminate_on_no_pipes_delayed;
+ }
+
+ // Ask peer to attach to the out pipe (if one exists). If required, ask
+ // it to create a pipe in opposite direction. It's assumed that peer's
+ // seqnum was already incremented, so we don't need to care whether it's
+ // alive at the moment.
+ if (in_)
+ inc_seqnum ();
+ send_bind (peer_, pipe_reader, in_ ? this : NULL);
+}
+
+void zs::session_t::revive ()
+{
+ if (engine)
+ engine->revive ();
+}
+
+void zs::session_t::terminate ()
+{
+ // Terminate is always called by engine, thus it'll terminate itself,
+ // we just have to drop the pointer.
+ engine = NULL;
+
+ // Propagate the terminate signal to both inbound and outbound pipes.
+ if (mux) {
+ mux->terminate ();
+ mux = NULL;
+ }
+ if (demux) {
+ demux->terminate ();
+ demux = NULL;
+ }
+
+ // Session cannot be deallocated at this point. There can still be
+ // pending commands to process. Unregister session from global
+ // repository thus ensuring that no new commands will be sent.
+ unregister_inproc_endpoints (this);
+
+ // Move to terminating state.
+ safe_object_t::terminate ();
+}
+
+zs::session_t::~session_t ()
+{
+ // When session is actually deallocated it unregisters from its thread.
+ // Unregistration cannot be done earlier as it would result in memory
+ // leak if global shutdown happens in the middle of session termination.
+ thread->detach_session (this);
+}
+
+void zs::session_t::set_engine (i_engine *engine_)
+{
+ zs_assert (!engine || !engine_);
+ engine = engine_;
+}
+
+void zs::session_t::set_index (int index_)
+{
+ index = index_;
+}
+
+int zs::session_t::get_index ()
+{
+ return index;
+}
+
+bool zs::session_t::write (zs_msg *msg_)
+{
+ return demux->send (msg_);
+}
+
+void zs::session_t::flush ()
+{
+ demux->flush ();
+}
+
+bool zs::session_t::read (zs_msg *msg_)
+{
+ bool retrieved = mux->recv (msg_);
+ if (terminate_on_no_pipes && mux->empty () && demux->empty ()) {
+ zs_assert (engine);
+ engine->schedule_terminate ();
+ terminate ();
+ }
+ return retrieved;
+}
+
+void zs::session_t::process_bind (pipe_reader_t *reader_, session_t *peer_)
+{
+ if (is_terminating ()) {
+
+ // If session is already in termination phase, we'll ask newly arrived
+ // pipe reader & writer to terminate straight away.
+ if (reader_)
+ reader_->terminate ();
+
+ // Peer session has already incremented its seqnum. We have to send
+ // a dummy command to avoid a memory leak.
+ if (peer_)
+ send_bind (peer_, NULL, NULL);
+
+ return;
+ }
+
+ // If inbound pipe is provided, bind it to the mux.
+ if (reader_) {
+ mux->attach_pipe (reader_);
+
+ // There's at least one pipe attached. We can deallocate the object
+ // when there are no pipes (if required).
+ terminate_on_no_pipes = terminate_on_no_pipes_delayed;
+ }
+
+ // If peer wants to get messages from ourselves, we'll bind to it.
+ if (peer_) {
+ pipe_reader_t *pipe_reader;
+ pipe_writer_t *pipe_writer;
+ create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer);
+ demux->attach_pipe (pipe_writer);
+ send_bind (peer_, pipe_reader, NULL);
+
+ // There's at least one pipe attached. We can deallocate the object
+ // when there are no pipes (if required).
+ terminate_on_no_pipes = terminate_on_no_pipes_delayed;
+ }
+}
+
+void zs::session_t::process_reg (simple_semaphore_t *smph_)
+{
+ zs_assert (!is_terminating ());
+
+ // Add the session to the list of sessions associated with this I/O thread.
+ // This way the session will be deallocated on the terminal shutdown.
+ thread->attach_session (this);
+
+ // Release calling thead (if required).
+ if (smph_)
+ smph_->post ();
+}
+
+void zs::session_t::process_reg_and_bind (session_t *peer_,
+ bool flow_in_, bool flow_out_)
+{
+ zs_assert (!is_terminating ());
+
+ // Add the session to the list of sessions associated with this I/O thread.
+ // This way the session will be deallocated on the terminal shutdown.
+ thread->attach_session (this);
+
+ // Bind to the peer. Note that caller have already incremented command
+ // sequence number of the peer so we are sure it still exists.
+ pipe_reader_t *pipe_reader = NULL;
+ if (flow_out_) {
+ pipe_writer_t *pipe_writer;
+ create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer);
+ demux->attach_pipe (pipe_writer);
+
+ // There's at least one pipe attached. We can deallocate the object
+ // when there are no pipes (if required).
+ terminate_on_no_pipes = terminate_on_no_pipes_delayed;
+ }
+ send_bind (peer_, pipe_reader, flow_in_ ? this : NULL);
+}
+
+void zs::session_t::process_engine (i_engine *engine_)
+{
+ if (is_terminating ()) {
+
+ // Kill the engine. It won't be needed anymore.
+ engine_->terminate ();
+ return;
+ }
+
+ engine_->attach (thread->get_poller (), this);
+}