diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-08-11 14:09:56 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-08-25 15:39:20 +0200 |
commit | d13933bc62fce71b5a58118020e0dd3776e79aa9 (patch) | |
tree | 6586d5b9cc637dbf8acae4b32d24da9c8e046014 /src/session.hpp | |
parent | ee1f1af0091d9bdffa0e5ce1783da925b3cd7e56 (diff) |
I/O object hierarchy implemented
Diffstat (limited to 'src/session.hpp')
-rw-r--r-- | src/session.hpp | 73 |
1 files changed, 47 insertions, 26 deletions
diff --git a/src/session.hpp b/src/session.hpp index 603b50c..ba259dc 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,8 +20,8 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ +#include "own.hpp" #include "i_inout.hpp" -#include "owned.hpp" #include "options.hpp" #include "blob.hpp" #include "pipe.hpp" @@ -30,29 +30,22 @@ namespace zmq { class session_t : - public owned_t, + public own_t, public i_inout, public i_reader_events, public i_writer_events { public: - // Creates unnamed session. - session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_); + session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_); - // Creates named session. - session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const blob_t &peer_identity_); - - // i_inout interface implementation. + // i_inout interface implementation. Note that detach method is not + // implemented by generic session. Different session types may handle + // engine disconnection in different ways. bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); - void detach (owned_t *reconnecter_); - class io_thread_t *get_io_thread (); - class socket_base_t *get_owner (); - uint64_t get_ordinal (); void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); @@ -65,19 +58,40 @@ namespace zmq void activated (class writer_t *pipe_); void terminated (class writer_t *pipe_); - private: + protected: + + // Forcefully close this session (without sending + // outbound messages to the wire). + void terminate (); + + // Two events for the derived session type. Attached is triggered + // when session is attached to a peer, detached is triggered at the + // beginning of the termination process when session is about to + // be detached from the peer. + virtual void attached (const blob_t &peer_identity_); + virtual void detached (); ~session_t (); - // Define the delayed termination. (I.e. termination is postponed - // till all the data is flushed to the kernel.) - bool is_terminable (); + // Remove any half processed messages. Flush unflushed messages. + // Call this function when engine disconnect to get rid of leftovers. + void clean_pipes (); + + // Inherited socket options. These are visible to all session classes. + options_t options; + + private: // Handlers for incoming commands. void process_plug (); void process_unplug (); void process_attach (struct i_engine *engine_, const blob_t &peer_identity_); + void process_term (); + + // Check whether object is ready for termination. If so proceed + // with closing child objects. + void finalise (); // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; @@ -92,18 +106,25 @@ namespace zmq // Outbound pipe, i.e. one the socket is sending messages to. class writer_t *out_pipe; + // The protocol I/O engine connected to the session. struct i_engine *engine; - // Session is identified by ordinal in the case when it was created - // before connection to the peer was established and thus we are - // unaware of peer's identity. - uint64_t ordinal; - - // Identity of the peer. + // Identity of the peer (say the component on the other side + // of TCP connection). blob_t peer_identity; - // Inherited socket options. - options_t options; + // The socket the session belongs to. + class socket_base_t *socket; + + // I/O thread the session is living in. It will be used to plug in + // the engines into the same thread. + class io_thread_t *io_thread; + + // True if pipes were already attached. + bool attach_processed; + + // True if term command was already processed. + bool term_processed; session_t (const session_t&); void operator = (const session_t&); |