diff options
Diffstat (limited to 'src/session.cpp')
-rw-r--r-- | src/session.cpp | 274 |
1 files changed, 138 insertions, 136 deletions
diff --git a/src/session.cpp b/src/session.cpp index f798877..4c448af 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -20,58 +20,47 @@ #include <new> #include "session.hpp" +#include "socket_base.hpp" #include "i_engine.hpp" #include "err.hpp" #include "pipe.hpp" -zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_) : - owned_t (parent_, owner_), +zmq::session_t::session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_) : + own_t (io_thread_), + options (options_), in_pipe (NULL), incomplete_in (false), - active (true), out_pipe (NULL), engine (NULL), - options (options_) + socket (socket_), + io_thread (io_thread_), + attach_processed (false), + term_processed (false), + finalised (false) { - // It's possible to register the session at this point as it will be - // searched for only on reconnect, i.e. no race condition (session found - // before it is plugged into it's I/O thread) is possible. - ordinal = owner->register_session (this); } -zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const blob_t &peer_identity_) : - owned_t (parent_, owner_), - in_pipe (NULL), - incomplete_in (false), - active (true), - out_pipe (NULL), - engine (NULL), - ordinal (0), - peer_identity (peer_identity_), - options (options_) +zmq::session_t::~session_t () { - if (!peer_identity.empty () && peer_identity [0] != 0) { - if (!owner->register_session (peer_identity, this)) { + zmq_assert (!in_pipe); + zmq_assert (!out_pipe); - // TODO: There's already a session with the specified - // identity. We should presumably syslog it and drop the - // session. - zmq_assert (false); - } - } + if (engine) + engine->terminate (); } -zmq::session_t::~session_t () +void zmq::session_t::terminate () { - zmq_assert (!in_pipe); - zmq_assert (!out_pipe); + if (in_pipe) + in_pipe->terminate (); + if (out_pipe) + out_pipe->terminate (); } bool zmq::session_t::read (::zmq_msg_t *msg_) { - if (!in_pipe || !active) + if (!in_pipe) return false; if (!in_pipe->read (msg_)) @@ -97,17 +86,8 @@ void zmq::session_t::flush () out_pipe->flush (); } -void zmq::session_t::detach (owned_t *reconnecter_) +void zmq::session_t::clean_pipes () { - // Plug in the reconnecter object if any. - if (reconnecter_) { - send_plug (reconnecter_); - send_own (owner, reconnecter_); - } - - // Engine is terminating itself. No need to deallocate it from here. - engine = NULL; - // Get rid of half-processed messages in the out pipe. Flush any // unflushed messages upstream. if (out_pipe) { @@ -127,26 +107,6 @@ void zmq::session_t::detach (owned_t *reconnecter_) zmq_msg_close (&msg); } } - - // Terminate transient session. - if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0)) - term (); -} - -zmq::io_thread_t *zmq::session_t::get_io_thread () -{ - return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::session_t::get_owner () -{ - return owner; -} - -uint64_t zmq::session_t::get_ordinal () -{ - zmq_assert (ordinal); - return ordinal; } void zmq::session_t::attach_pipes (class reader_t *inpipe_, @@ -155,108 +115,113 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, if (inpipe_) { zmq_assert (!in_pipe); in_pipe = inpipe_; - active = true; - in_pipe->set_endpoint (this); + in_pipe->set_event_sink (this); } if (outpipe_) { zmq_assert (!out_pipe); out_pipe = outpipe_; - out_pipe->set_endpoint (this); + out_pipe->set_event_sink (this); + } + + // If we are already terminating, terminate the pipes straight away. + if (finalised) { + if (in_pipe) { + register_term_acks (1); + in_pipe->terminate (); + } + if (out_pipe) { + register_term_acks (1); + out_pipe->terminate (); + } + return; } + + attach_processed = true; + finalise (); } -void zmq::session_t::detach_inpipe (reader_t *pipe_) +void zmq::session_t::terminated (reader_t *pipe_) { - active = false; + zmq_assert (in_pipe == pipe_); in_pipe = NULL; + + if (finalised) { + unregister_term_ack (); + return; + } + + finalise (); } -void zmq::session_t::detach_outpipe (writer_t *pipe_) +void zmq::session_t::terminated (writer_t *pipe_) { + zmq_assert (out_pipe == pipe_); out_pipe = NULL; -} -void zmq::session_t::kill (reader_t *pipe_) -{ - active = false; + if (finalised) { + unregister_term_ack (); + return; + } + + finalise (); } -void zmq::session_t::revive (reader_t *pipe_) +void zmq::session_t::activated (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); - active = true; + if (engine) - engine->revive (); + engine->activate_out (); } -void zmq::session_t::revive (writer_t *pipe_) +void zmq::session_t::activated (writer_t *pipe_) { zmq_assert (out_pipe == pipe_); if (engine) - engine->resume_input (); + engine->activate_in (); } void zmq::session_t::process_plug () { } -void zmq::session_t::process_unplug () +void zmq::session_t::finalise () { - // Unregister the session from the socket. - if (ordinal) - owner->unregister_session (ordinal); - else if (!peer_identity.empty () && peer_identity [0] != 0) - owner->unregister_session (peer_identity); - - // Ask associated pipes to terminate. - if (in_pipe) { - in_pipe->term (); - in_pipe = NULL; - } - if (out_pipe) { - out_pipe->term (); - out_pipe = NULL; - } - - if (engine) { - engine->unplug (); - delete engine; - engine = NULL; + // There may be delimiter waiting in the inbound pipe, never to be read + // because the connection cannot be established. In order to terminate + // decently in such case, do check_read which will in turn start the pipe + // termination process if there's delimiter in it. + if (in_pipe) + in_pipe->check_read (); + + // If all conditions are met, proceed with termination: + // 1. Owner object already asked us to terminate. + // 2. The pipes were already attached to the session. + // 3. Both pipes have already terminated. Note that inbound pipe + // is terminated after delimiter is read, i.e. all messages + // were already sent to the wire. + if (term_processed && attach_processed && !in_pipe && !out_pipe) { + finalised = true; + own_t::process_term (); } } void zmq::session_t::process_attach (i_engine *engine_, const blob_t &peer_identity_) { - if (!peer_identity.empty ()) { - - // If both IDs are temporary, no checking is needed. - // TODO: Old ID should be reused in this case... - if (peer_identity.empty () || peer_identity [0] != 0 || - peer_identity_.empty () || peer_identity_ [0] != 0) { - - // If we already know the peer name do nothing, just check whether - // it haven't changed. - zmq_assert (peer_identity == peer_identity_); - } + // If some other object (e.g. init) notifies us that the connection failed + // we need to start the reconnection process. + if (!engine_) { + zmq_assert (!engine); + detached (); + return; } - else if (!peer_identity_.empty ()) { - - // Store the peer identity. - peer_identity = peer_identity_; - // If the session is not registered with the ordinal, let's register - // it using the peer name. - if (!ordinal) { - if (!owner->register_session (peer_identity, this)) { - - // TODO: There's already a session with the specified - // identity. We should presumably syslog it and drop the - // session. - zmq_assert (false); - } - } + // If we are already terminating, we destroy the engine straight away. + if (finalised) { + delete engine; + return; } // Check whether the required pipes already exist. If not so, we'll @@ -265,27 +230,64 @@ void zmq::session_t::process_attach (i_engine *engine_, writer_t *socket_writer = NULL; if (options.requires_in && !out_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap); - zmq_assert (pipe); - out_pipe = &pipe->writer; - out_pipe->set_endpoint (this); - socket_reader = &pipe->reader; + create_pipe (socket, this, options.hwm, options.swap, &socket_reader, + &out_pipe); + out_pipe->set_event_sink (this); } if (options.requires_out && !in_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap); - zmq_assert (pipe); - in_pipe = &pipe->reader; - in_pipe->set_endpoint (this); - socket_writer = &pipe->writer; + create_pipe (this, socket, options.hwm, options.swap, &in_pipe, + &socket_writer); + in_pipe->set_event_sink (this); } if (socket_reader || socket_writer) - send_bind (owner, socket_reader, socket_writer, peer_identity); + send_bind (socket, socket_reader, socket_writer, peer_identity_); // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); engine = engine_; - engine->plug (this); + engine->plug (io_thread, this); + + attach_processed = true; + + // Trigger the notfication about the attachment. + attached (peer_identity_); +} + +void zmq::session_t::detach () +{ + // Engine is dead. Let's forget about it. + engine = NULL; + + detached (); +} + +void zmq::session_t::process_term () +{ + // Here we are pugging into the own_t's termination mechanism. + // The goal is to postpone the termination till all the pending messages + // are sent to the peer. + term_processed = true; + finalise (); +} + +bool zmq::session_t::register_session (const blob_t &name_, session_t *session_) +{ + return socket->register_session (name_, session_); } + +void zmq::session_t::unregister_session (const blob_t &name_) +{ + socket->unregister_session (name_); +} + +void zmq::session_t::attached (const blob_t &peer_identity_) +{ +} + +void zmq::session_t::detached () +{ +} + |