summaryrefslogtreecommitdiff
path: root/src/session.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-11 14:09:56 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commitd13933bc62fce71b5a58118020e0dd3776e79aa9 (patch)
tree6586d5b9cc637dbf8acae4b32d24da9c8e046014 /src/session.hpp
parentee1f1af0091d9bdffa0e5ce1783da925b3cd7e56 (diff)
I/O object hierarchy implemented
Diffstat (limited to 'src/session.hpp')
-rw-r--r--src/session.hpp73
1 files changed, 47 insertions, 26 deletions
diff --git a/src/session.hpp b/src/session.hpp
index 603b50c..ba259dc 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -20,8 +20,8 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
+#include "own.hpp"
#include "i_inout.hpp"
-#include "owned.hpp"
#include "options.hpp"
#include "blob.hpp"
#include "pipe.hpp"
@@ -30,29 +30,22 @@ namespace zmq
{
class session_t :
- public owned_t,
+ public own_t,
public i_inout,
public i_reader_events,
public i_writer_events
{
public:
- // Creates unnamed session.
- session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_);
+ session_t (class io_thread_t *io_thread_,
+ class socket_base_t *socket_, const options_t &options_);
- // Creates named session.
- session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const blob_t &peer_identity_);
-
- // i_inout interface implementation.
+ // i_inout interface implementation. Note that detach method is not
+ // implemented by generic session. Different session types may handle
+ // engine disconnection in different ways.
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
- void detach (owned_t *reconnecter_);
- class io_thread_t *get_io_thread ();
- class socket_base_t *get_owner ();
- uint64_t get_ordinal ();
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
@@ -65,19 +58,40 @@ namespace zmq
void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_);
- private:
+ protected:
+
+ // Forcefully close this session (without sending
+ // outbound messages to the wire).
+ void terminate ();
+
+ // Two events for the derived session type. Attached is triggered
+ // when session is attached to a peer, detached is triggered at the
+ // beginning of the termination process when session is about to
+ // be detached from the peer.
+ virtual void attached (const blob_t &peer_identity_);
+ virtual void detached ();
~session_t ();
- // Define the delayed termination. (I.e. termination is postponed
- // till all the data is flushed to the kernel.)
- bool is_terminable ();
+ // Remove any half processed messages. Flush unflushed messages.
+ // Call this function when engine disconnect to get rid of leftovers.
+ void clean_pipes ();
+
+ // Inherited socket options. These are visible to all session classes.
+ options_t options;
+
+ private:
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_);
+ void process_term ();
+
+ // Check whether object is ready for termination. If so proceed
+ // with closing child objects.
+ void finalise ();
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
@@ -92,18 +106,25 @@ namespace zmq
// Outbound pipe, i.e. one the socket is sending messages to.
class writer_t *out_pipe;
+ // The protocol I/O engine connected to the session.
struct i_engine *engine;
- // Session is identified by ordinal in the case when it was created
- // before connection to the peer was established and thus we are
- // unaware of peer's identity.
- uint64_t ordinal;
-
- // Identity of the peer.
+ // Identity of the peer (say the component on the other side
+ // of TCP connection).
blob_t peer_identity;
- // Inherited socket options.
- options_t options;
+ // The socket the session belongs to.
+ class socket_base_t *socket;
+
+ // I/O thread the session is living in. It will be used to plug in
+ // the engines into the same thread.
+ class io_thread_t *io_thread;
+
+ // True if pipes were already attached.
+ bool attach_processed;
+
+ // True if term command was already processed.
+ bool term_processed;
session_t (const session_t&);
void operator = (const session_t&);