From d13933bc62fce71b5a58118020e0dd3776e79aa9 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 11 Aug 2010 14:09:56 +0200 Subject: I/O object hierarchy implemented --- src/session.cpp | 161 ++++++++++++++++++++------------------------------------ 1 file changed, 57 insertions(+), 104 deletions(-) (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp index 3c74898..0494ff1 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -20,47 +20,25 @@ #include #include "session.hpp" +#include "socket_base.hpp" #include "i_engine.hpp" #include "err.hpp" #include "pipe.hpp" -zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_) : - owned_t (parent_, owner_), +zmq::session_t::session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_) : + own_t (io_thread_), + options (options_), in_pipe (NULL), incomplete_in (false), active (true), out_pipe (NULL), engine (NULL), - options (options_) + socket (socket_), + io_thread (io_thread_), + attach_processed (false), + term_processed (false) { - // It's possible to register the session at this point as it will be - // searched for only on reconnect, i.e. no race condition (session found - // before it is plugged into it's I/O thread) is possible. - ordinal = owner->register_session (this); -} - -zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const blob_t &peer_identity_) : - owned_t (parent_, owner_), - in_pipe (NULL), - incomplete_in (false), - active (true), - out_pipe (NULL), - engine (NULL), - ordinal (0), - peer_identity (peer_identity_), - options (options_) -{ - if (!peer_identity.empty () && peer_identity [0] != 0) { - if (!owner->register_session (peer_identity, this)) { - - // TODO: There's already a session with the specified - // identity. We should presumably syslog it and drop the - // session. - zmq_assert (false); - } - } } zmq::session_t::~session_t () @@ -69,10 +47,10 @@ zmq::session_t::~session_t () zmq_assert (!out_pipe); } -bool zmq::session_t::is_terminable () +void zmq::session_t::terminate () { - // The session won't send term_ack until both in & out pipe are closed. - return !in_pipe && !out_pipe; + // TODO: + zmq_assert (false); } bool zmq::session_t::read (::zmq_msg_t *msg_) @@ -105,17 +83,8 @@ void zmq::session_t::flush () out_pipe->flush (); } -void zmq::session_t::detach (owned_t *reconnecter_) +void zmq::session_t::clean_pipes () { - // Plug in the reconnecter object if any. - if (reconnecter_) { - send_plug (reconnecter_); - send_own (owner, reconnecter_); - } - - // Engine is terminating itself. No need to deallocate it from here. - engine = NULL; - // Get rid of half-processed messages in the out pipe. Flush any // unflushed messages upstream. if (out_pipe) { @@ -135,26 +104,6 @@ void zmq::session_t::detach (owned_t *reconnecter_) zmq_msg_close (&msg); } } - - // Terminate transient session. - if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0)) - term (); -} - -zmq::io_thread_t *zmq::session_t::get_io_thread () -{ - return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::session_t::get_owner () -{ - return owner; -} - -uint64_t zmq::session_t::get_ordinal () -{ - zmq_assert (ordinal); - return ordinal; } void zmq::session_t::attach_pipes (class reader_t *inpipe_, @@ -172,6 +121,9 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, out_pipe = outpipe_; out_pipe->set_event_sink (this); } + + attach_processed = true; + finalise (); } void zmq::session_t::terminated (reader_t *pipe_) @@ -192,14 +144,14 @@ void zmq::session_t::activated (reader_t *pipe_) zmq_assert (in_pipe == pipe_); active = true; if (engine) - engine->revive (); + engine->activate_out (); } void zmq::session_t::activated (writer_t *pipe_) { zmq_assert (out_pipe == pipe_); if (engine) - engine->resume_input (); + engine->activate_in (); } void zmq::session_t::process_plug () @@ -214,10 +166,9 @@ void zmq::session_t::process_unplug () // there may be some commands being sent to the session right now. // Unregister the session from the socket. - if (ordinal) - owner->unregister_session (ordinal); - else if (!peer_identity.empty () && peer_identity [0] != 0) - owner->unregister_session (peer_identity); +// if (!peer_identity.empty () && peer_identity [0] != 0) +// unregister_session (peer_identity); +// TODO: Should be done in named session. // Ask associated pipes to terminate. if (in_pipe) @@ -232,63 +183,65 @@ void zmq::session_t::process_unplug () } } +void zmq::session_t::finalise () +{ + // If all conditions are met, proceed with termination: + // 1. Owner object already asked us to terminate. + // 2. The pipes were already attached to the session. + // 3. Both pipes have already terminated. Note that inbound pipe + // is terminated after delimiter is read, i.e. all messages + // were already sent to the wire. + if (term_processed && attach_processed && !in_pipe && !out_pipe) + own_t::process_term (); +} + void zmq::session_t::process_attach (i_engine *engine_, const blob_t &peer_identity_) { - if (!peer_identity.empty ()) { - - // If both IDs are temporary, no checking is needed. - // TODO: Old ID should be reused in this case... - if (peer_identity.empty () || peer_identity [0] != 0 || - peer_identity_.empty () || peer_identity_ [0] != 0) { - - // If we already know the peer name do nothing, just check whether - // it haven't changed. - zmq_assert (peer_identity == peer_identity_); - } - } - else if (!peer_identity_.empty ()) { - - // Store the peer identity. - peer_identity = peer_identity_; - - // If the session is not registered with the ordinal, let's register - // it using the peer name. - if (!ordinal) { - if (!owner->register_session (peer_identity, this)) { - - // TODO: There's already a session with the specified - // identity. We should presumably syslog it and drop the - // session. - zmq_assert (false); - } - } - } - // Check whether the required pipes already exist. If not so, we'll // create them and bind them to the socket object. reader_t *socket_reader = NULL; writer_t *socket_writer = NULL; if (options.requires_in && !out_pipe) { - create_pipe (owner, this, options.hwm, options.swap, &socket_reader, + create_pipe (socket, this, options.hwm, options.swap, &socket_reader, &out_pipe); out_pipe->set_event_sink (this); } if (options.requires_out && !in_pipe) { - create_pipe (this, owner, options.hwm, options.swap, &in_pipe, + create_pipe (this, socket, options.hwm, options.swap, &in_pipe, &socket_writer); in_pipe->set_event_sink (this); } if (socket_reader || socket_writer) - send_bind (owner, socket_reader, socket_writer, peer_identity); + send_bind (socket, socket_reader, socket_writer, peer_identity); // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); engine = engine_; - engine->plug (this); + engine->plug (io_thread, this); + + // Trigger the notfication about the attachment. + attached (peer_identity_); +} + +void zmq::session_t::process_term () +{ + // Here we are pugging into the own_t's termination mechanism. + // The goal is to postpone the termination till all the pending messages + // are sent to the peer. + term_processed = true; + finalise (); +} + +void zmq::session_t::attached (const blob_t &peer_identity_) +{ +} + +void zmq::session_t::detached () +{ } -- cgit v1.2.3