diff options
Diffstat (limited to 'src/session.hpp')
-rw-r--r-- | src/session.hpp | 93 |
1 files changed, 62 insertions, 31 deletions
diff --git a/src/session.hpp b/src/session.hpp index 9bda1ad..38cf317 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,54 +20,82 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ +#include "own.hpp" #include "i_inout.hpp" -#include "i_endpoint.hpp" -#include "owned.hpp" #include "options.hpp" #include "blob.hpp" +#include "pipe.hpp" namespace zmq { - class session_t : public owned_t, public i_inout, public i_endpoint + class session_t : + public own_t, + public i_inout, + public i_reader_events, + public i_writer_events { public: - // Creates unnamed session. - session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_); + session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_); - // Creates named session. - session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const blob_t &peer_identity_); - - // i_inout interface implementation. + // i_inout interface implementation. Note that detach method is not + // implemented by generic session. Different session types may handle + // engine disconnection in different ways. bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); - void detach (owned_t *reconnecter_); - class io_thread_t *get_io_thread (); - class socket_base_t *get_owner (); - uint64_t get_ordinal (); + void detach (); - // i_endpoint interface implementation. void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void detach_inpipe (class reader_t *pipe_); - void detach_outpipe (class writer_t *pipe_); - void kill (class reader_t *pipe_); - void revive (class reader_t *pipe_); - void revive (class writer_t *pipe_); - private: + // i_reader_events interface implementation. + void activated (class reader_t *pipe_); + void terminated (class reader_t *pipe_); + + // i_writer_events interface implementation. + void activated (class writer_t *pipe_); + void terminated (class writer_t *pipe_); + + protected: + + // Forcefully close this session (without sending + // outbound messages to the wire). + void terminate (); + + // Two events for the derived session type. Attached is triggered + // when session is attached to a peer, detached is triggered at the + // beginning of the termination process when session is about to + // be detached from the peer. + virtual void attached (const blob_t &peer_identity_); + virtual void detached (); + + // Allows derives session types to (un)register session names. + bool register_session (const blob_t &name_, class session_t *session_); + void unregister_session (const blob_t &name_); ~session_t (); + // Remove any half processed messages. Flush unflushed messages. + // Call this function when engine disconnect to get rid of leftovers. + void clean_pipes (); + + // Inherited socket options. These are visible to all session classes. + options_t options; + + private: + // Handlers for incoming commands. void process_plug (); - void process_unplug (); void process_attach (struct i_engine *engine_, const blob_t &peer_identity_); + void process_term (); + + // Check whether object is ready for termination. If so proceed + // with closing child objects. + void finalise (); // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; @@ -82,18 +110,21 @@ namespace zmq // Outbound pipe, i.e. one the socket is sending messages to. class writer_t *out_pipe; + // The protocol I/O engine connected to the session. struct i_engine *engine; - // Session is identified by ordinal in the case when it was created - // before connection to the peer was established and thus we are - // unaware of peer's identity. - uint64_t ordinal; + // The socket the session belongs to. + class socket_base_t *socket; - // Identity of the peer. - blob_t peer_identity; + // I/O thread the session is living in. It will be used to plug in + // the engines into the same thread. + class io_thread_t *io_thread; - // Inherited socket options. - options_t options; + // True if pipes were already attached. + bool attach_processed; + + // True if term command was already processed. + bool term_processed; session_t (const session_t&); void operator = (const session_t&); |