diff options
Diffstat (limited to 'src/session.cpp')
-rw-r--r-- | src/session.cpp | 295 |
1 files changed, 153 insertions, 142 deletions
diff --git a/src/session.cpp b/src/session.cpp index f798877..c926e63 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -20,58 +20,60 @@ #include <new> #include "session.hpp" +#include "socket_base.hpp" #include "i_engine.hpp" #include "err.hpp" #include "pipe.hpp" +#include "likely.hpp" -zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_) : - owned_t (parent_, owner_), +zmq::session_t::session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_) : + own_t (io_thread_), + options (options_), in_pipe (NULL), incomplete_in (false), - active (true), out_pipe (NULL), engine (NULL), - options (options_) + socket (socket_), + io_thread (io_thread_), + delimiter_processed (false), + force_terminate (false), + state (active) { - // It's possible to register the session at this point as it will be - // searched for only on reconnect, i.e. no race condition (session found - // before it is plugged into it's I/O thread) is possible. - ordinal = owner->register_session (this); } -zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const blob_t &peer_identity_) : - owned_t (parent_, owner_), - in_pipe (NULL), - incomplete_in (false), - active (true), - out_pipe (NULL), - engine (NULL), - ordinal (0), - peer_identity (peer_identity_), - options (options_) +zmq::session_t::~session_t () { - if (!peer_identity.empty () && peer_identity [0] != 0) { - if (!owner->register_session (peer_identity, this)) { + zmq_assert (!in_pipe); + zmq_assert (!out_pipe); - // TODO: There's already a session with the specified - // identity. We should presumably syslog it and drop the - // session. - zmq_assert (false); - } - } + if (engine) + engine->terminate (); } -zmq::session_t::~session_t () +void zmq::session_t::proceed_with_term () { - zmq_assert (!in_pipe); - zmq_assert (!out_pipe); + if (state == terminating) + return; + + zmq_assert (state == pending); + state = terminating; + + if (in_pipe) { + register_term_acks (1); + in_pipe->terminate (); + } + if (out_pipe) { + register_term_acks (1); + out_pipe->terminate (); + } + + own_t::process_term (); } bool zmq::session_t::read (::zmq_msg_t *msg_) { - if (!in_pipe || !active) + if (!in_pipe) return false; if (!in_pipe->read (msg_)) @@ -97,17 +99,8 @@ void zmq::session_t::flush () out_pipe->flush (); } -void zmq::session_t::detach (owned_t *reconnecter_) +void zmq::session_t::clean_pipes () { - // Plug in the reconnecter object if any. - if (reconnecter_) { - send_plug (reconnecter_); - send_own (owner, reconnecter_); - } - - // Engine is terminating itself. No need to deallocate it from here. - engine = NULL; - // Get rid of half-processed messages in the out pipe. Flush any // unflushed messages upstream. if (out_pipe) { @@ -127,26 +120,6 @@ void zmq::session_t::detach (owned_t *reconnecter_) zmq_msg_close (&msg); } } - - // Terminate transient session. - if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0)) - term (); -} - -zmq::io_thread_t *zmq::session_t::get_io_thread () -{ - return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::session_t::get_owner () -{ - return owner; -} - -uint64_t zmq::session_t::get_ordinal () -{ - zmq_assert (ordinal); - return ordinal; } void zmq::session_t::attach_pipes (class reader_t *inpipe_, @@ -155,108 +128,92 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, if (inpipe_) { zmq_assert (!in_pipe); in_pipe = inpipe_; - active = true; - in_pipe->set_endpoint (this); + in_pipe->set_event_sink (this); } if (outpipe_) { zmq_assert (!out_pipe); out_pipe = outpipe_; - out_pipe->set_endpoint (this); + out_pipe->set_event_sink (this); + } + + // If we are already terminating, terminate the pipes straight away. + if (state == terminating) { + if (in_pipe) { + in_pipe->terminate (); + register_term_acks (1); + } + if (out_pipe) { + out_pipe->terminate (); + register_term_acks (1); + } } } -void zmq::session_t::detach_inpipe (reader_t *pipe_) +void zmq::session_t::delimited (reader_t *pipe_) { - active = false; - in_pipe = NULL; + zmq_assert (in_pipe == pipe_); + zmq_assert (!delimiter_processed); + delimiter_processed = true; + + // If we are in process of being closed, but still waiting for all + // pending messeges being sent, we can terminate here. + if (state == pending) + proceed_with_term (); } -void zmq::session_t::detach_outpipe (writer_t *pipe_) +void zmq::session_t::terminated (reader_t *pipe_) { - out_pipe = NULL; + zmq_assert (in_pipe == pipe_); + in_pipe = NULL; + if (state == terminating) + unregister_term_ack (); } -void zmq::session_t::kill (reader_t *pipe_) +void zmq::session_t::terminated (writer_t *pipe_) { - active = false; + zmq_assert (out_pipe == pipe_); + out_pipe = NULL; + if (state == terminating) + unregister_term_ack (); } -void zmq::session_t::revive (reader_t *pipe_) +void zmq::session_t::activated (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); - active = true; - if (engine) - engine->revive (); + + if (likely (engine != NULL)) + engine->activate_out (); + else + in_pipe->check_read (); } -void zmq::session_t::revive (writer_t *pipe_) +void zmq::session_t::activated (writer_t *pipe_) { zmq_assert (out_pipe == pipe_); if (engine) - engine->resume_input (); + engine->activate_in (); } void zmq::session_t::process_plug () { } -void zmq::session_t::process_unplug () -{ - // Unregister the session from the socket. - if (ordinal) - owner->unregister_session (ordinal); - else if (!peer_identity.empty () && peer_identity [0] != 0) - owner->unregister_session (peer_identity); - - // Ask associated pipes to terminate. - if (in_pipe) { - in_pipe->term (); - in_pipe = NULL; - } - if (out_pipe) { - out_pipe->term (); - out_pipe = NULL; - } - - if (engine) { - engine->unplug (); - delete engine; - engine = NULL; - } -} - void zmq::session_t::process_attach (i_engine *engine_, const blob_t &peer_identity_) { - if (!peer_identity.empty ()) { - - // If both IDs are temporary, no checking is needed. - // TODO: Old ID should be reused in this case... - if (peer_identity.empty () || peer_identity [0] != 0 || - peer_identity_.empty () || peer_identity_ [0] != 0) { - - // If we already know the peer name do nothing, just check whether - // it haven't changed. - zmq_assert (peer_identity == peer_identity_); - } + // If some other object (e.g. init) notifies us that the connection failed + // we need to start the reconnection process. + if (!engine_) { + zmq_assert (!engine); + detached (); + return; } - else if (!peer_identity_.empty ()) { - // Store the peer identity. - peer_identity = peer_identity_; - - // If the session is not registered with the ordinal, let's register - // it using the peer name. - if (!ordinal) { - if (!owner->register_session (peer_identity, this)) { - - // TODO: There's already a session with the specified - // identity. We should presumably syslog it and drop the - // session. - zmq_assert (false); - } - } + // If we are already terminating, we destroy the engine straight away. + if (state == terminating) { + delete engine; + return; } // Check whether the required pipes already exist. If not so, we'll @@ -264,28 +221,82 @@ void zmq::session_t::process_attach (i_engine *engine_, reader_t *socket_reader = NULL; writer_t *socket_writer = NULL; + // Create the pipes, if required. if (options.requires_in && !out_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap); - zmq_assert (pipe); - out_pipe = &pipe->writer; - out_pipe->set_endpoint (this); - socket_reader = &pipe->reader; + create_pipe (socket, this, options.hwm, options.swap, &socket_reader, + &out_pipe); + out_pipe->set_event_sink (this); } - if (options.requires_out && !in_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap); - zmq_assert (pipe); - in_pipe = &pipe->reader; - in_pipe->set_endpoint (this); - socket_writer = &pipe->writer; + create_pipe (this, socket, options.hwm, options.swap, &in_pipe, + &socket_writer); + in_pipe->set_event_sink (this); } + // Bind the pipes to the socket object. if (socket_reader || socket_writer) - send_bind (owner, socket_reader, socket_writer, peer_identity); + send_bind (socket, socket_reader, socket_writer, peer_identity_); // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); engine = engine_; - engine->plug (this); + engine->plug (io_thread, this); + + // Trigger the notfication about the attachment. + attached (peer_identity_); +} + +void zmq::session_t::detach () +{ + // Engine is dead. Let's forget about it. + engine = NULL; + + detached (); +} + +void zmq::session_t::process_term () +{ + zmq_assert (state == active); + state = pending; + + // If there's no engine and there's only delimiter in the pipe it wouldn't + // be ever read. Thus we check for it explicitly. + if (in_pipe) + in_pipe->check_read (); + + // If there's no in pipe there are no pending messages to send. + // We can proceed with the shutdown straight away. Also, if there is + // inbound pipe, but the delimiter was already processed, we can + // terminate immediately. Alternatively, if the derived session type have + // called 'terminate' we'll finish straight away. + if (!options.requires_out || delimiter_processed || force_terminate || + (!options.immediate_connect && !in_pipe)) + proceed_with_term (); +} + +bool zmq::session_t::register_session (const blob_t &name_, session_t *session_) +{ + return socket->register_session (name_, session_); +} + +void zmq::session_t::unregister_session (const blob_t &name_) +{ + socket->unregister_session (name_); +} + +void zmq::session_t::attached (const blob_t &peer_identity_) +{ +} + +void zmq::session_t::detached () +{ + if (in_pipe) + in_pipe->check_read (); +} + +void zmq::session_t::terminate () +{ + force_terminate = true; + own_t::terminate (); } |