diff options
Diffstat (limited to 'src/session.hpp')
-rw-r--r-- | src/session.hpp | 115 |
1 files changed, 80 insertions, 35 deletions
diff --git a/src/session.hpp b/src/session.hpp index 9bda1ad..8adda5e 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,54 +20,83 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ +#include "own.hpp" #include "i_inout.hpp" -#include "i_endpoint.hpp" -#include "owned.hpp" -#include "options.hpp" +#include "io_object.hpp" #include "blob.hpp" +#include "pipe.hpp" namespace zmq { - class session_t : public owned_t, public i_inout, public i_endpoint + class session_t : + public own_t, + public io_object_t, + public i_inout, + public i_reader_events, + public i_writer_events { public: - // Creates unnamed session. - session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_); + session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_); - // Creates named session. - session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const blob_t &peer_identity_); - - // i_inout interface implementation. + // i_inout interface implementation. Note that detach method is not + // implemented by generic session. Different session types may handle + // engine disconnection in different ways. bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); - void detach (owned_t *reconnecter_); - class io_thread_t *get_io_thread (); - class socket_base_t *get_owner (); - uint64_t get_ordinal (); + void detach (); - // i_endpoint interface implementation. void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void detach_inpipe (class reader_t *pipe_); - void detach_outpipe (class writer_t *pipe_); - void kill (class reader_t *pipe_); - void revive (class reader_t *pipe_); - void revive (class writer_t *pipe_); - private: + // i_reader_events interface implementation. + void activated (class reader_t *pipe_); + void terminated (class reader_t *pipe_); + void delimited (class reader_t *pipe_); + + // i_writer_events interface implementation. + void activated (class writer_t *pipe_); + void terminated (class writer_t *pipe_); + + protected: + + // This function allows to shut down the session even though + // there are pending messages in the inbound pipe. + void terminate (); + + // Two events for the derived session type. Attached is triggered + // when session is attached to a peer, detached is triggered at the + // beginning of the termination process when session is about to + // be detached from the peer. + virtual void attached (const blob_t &peer_identity_) = 0; + virtual void detached () = 0; + + // Allows derives session types to (un)register session names. + bool register_session (const blob_t &name_, class session_t *session_); + void unregister_session (const blob_t &name_); ~session_t (); + private: + // Handlers for incoming commands. void process_plug (); - void process_unplug (); void process_attach (struct i_engine *engine_, const blob_t &peer_identity_); + void process_term (int linger_); + + // i_poll_events handlers. + void timer_event (int id_); + + // Remove any half processed messages. Flush unflushed messages. + // Call this function when engine disconnect to get rid of leftovers. + void clean_pipes (); + + // Call this function to move on with the delayed process_term. + void proceed_with_term (); // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; @@ -76,24 +105,40 @@ namespace zmq // is still in the in pipe. bool incomplete_in; - // If true, in_pipe is active. Otherwise there are no messages to get. - bool active; - // Outbound pipe, i.e. one the socket is sending messages to. class writer_t *out_pipe; + // The protocol I/O engine connected to the session. struct i_engine *engine; - // Session is identified by ordinal in the case when it was created - // before connection to the peer was established and thus we are - // unaware of peer's identity. - uint64_t ordinal; + // The socket the session belongs to. + class socket_base_t *socket; + + // I/O thread the session is living in. It will be used to plug in + // the engines into the same thread. + class io_thread_t *io_thread; + + // If true, pipes were already attached to this session. + bool pipes_attached; + + // If true, delimiter was already read from the inbound pipe. + bool delimiter_processed; + + // If true, we should terminate the session even though there are + // pending messages in the inbound pipe. + bool force_terminate; + + // ID of the linger timer + enum {linger_timer_id = 0x20}; - // Identity of the peer. - blob_t peer_identity; + // True is linger timer is running. + bool has_linger_timer; - // Inherited socket options. - options_t options; + enum { + active, + pending, + terminating + } state; session_t (const session_t&); void operator = (const session_t&); |