/*
Copyright (c) 2009-2012 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of Crossroads I/O project.
Crossroads I/O is free software; you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
Crossroads is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
*/
#ifndef __XS_PIPE_HPP_INCLUDED__
#define __XS_PIPE_HPP_INCLUDED__
#include "msg.hpp"
#include "ypipe.hpp"
#include "config.hpp"
#include "object.hpp"
#include "stdint.hpp"
#include "array.hpp"
#include "blob.hpp"
namespace xs
{
class object_t;
class pipe_t;
// Create a pipepair for bi-directional transfer of messages.
// First HWM is for messages passed from first pipe to the second pipe.
// Second HWM is for messages passed from second pipe to the first pipe.
// Delay specifies how the pipe behaves when the peer terminates. If true
// pipe receives all the pending messages before terminating, otherwise it
// terminates straight away.
int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2],
int hwms_ [2], bool delays_ [2], int protocol_);
struct i_pipe_events
{
virtual ~i_pipe_events () {}
virtual void read_activated (xs::pipe_t *pipe_) = 0;
virtual void write_activated (xs::pipe_t *pipe_) = 0;
virtual void hiccuped (xs::pipe_t *pipe_) = 0;
virtual void terminated (xs::pipe_t *pipe_) = 0;
};
// Note that pipe can be stored in three different arrays.
// The array of inbound pipes (1), the array of outbound pipes (2) and
// the generic array of pipes to deallocate (3).
class pipe_t :
public object_t,
public array_item_t <1>,
public array_item_t <2>,
public array_item_t <3>
{
// This allows pipepair to create pipe objects.
friend int pipepair (xs::object_t *parents_ [2],
xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
int protocol_);
public:
// Specifies the object to send events to.
void set_event_sink (i_pipe_events *sink_);
// Pipe endpoint can store an opaque ID to be used by its clients.
void set_identity (const blob_t &identity_);
blob_t get_identity ();
// Returns true if there is at least one message to read in the pipe.
bool check_read ();
// Reads a message to the underlying pipe.
bool read (msg_t *msg_);
// Checks whether messages can be written to the pipe. If writing
// the message would cause high watermark the function returns false.
bool check_write (msg_t *msg_);
// Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached.
bool write (msg_t *msg_);
// Remove unfinished parts of the outbound message from the pipe.
void rollback ();
// Flush the messages downsteam.
void flush ();
// Temporaraily disconnects the inbound message stream and drops
// all the messages on the fly. Causes 'hiccuped' event to be generated
// in the peer.
void hiccup ();
// Ask pipe to terminate. The termination will happen asynchronously
// and user will be notified about actual deallocation by 'terminated'
// event. If delay is true, the pending messages will be processed
// before actual shutdown.
void terminate (bool delay_);
// Returns the ID of the protocol associated with the pipe.
int get_protocol ();
private:
// Type of the underlying lock-free pipe.
typedef ypipe_t upipe_t;
// Command handlers.
void process_activate_read ();
void process_activate_write (uint64_t msgs_read_);
void process_hiccup (void *pipe_);
void process_pipe_term ();
void process_pipe_term_ack ();
// Handler for delimiter read from the pipe.
void delimit ();
// Constructor is private. Pipe can only be created using
// pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_, bool delay_, int protocol_);
// Pipepair uses this function to let us know about
// the peer pipe object.
void set_peer (pipe_t *pipe_);
// Destructor is private. Pipe objects destroy themselves.
~pipe_t ();
// Underlying pipes for both directions.
upipe_t *inpipe;
upipe_t *outpipe;
// Can the pipe be read from / written to?
bool in_active;
bool out_active;
// High watermark for the outbound pipe.
int hwm;
// Low watermark for the inbound pipe.
int lwm;
// Number of messages read and written so far.
uint64_t msgs_read;
uint64_t msgs_written;
// Last received peer's msgs_read. The actual number in the peer
// can be higher at the moment.
uint64_t peers_msgs_read;
// The pipe object on the other side of the pipepair.
pipe_t *peer;
// Sink to send events to.
i_pipe_events *sink;
// State of the pipe endpoint. Active is common state before any
// termination begins. Delimited means that delimiter was read from
// pipe before term command was received. Pending means that term
// command was already received from the peer but there are still
// pending messages to read. Terminating means that all pending
// messages were already read and all we are waiting for is ack from
// the peer. Terminated means that 'terminate' was explicitly called
// by the user. Double_terminated means that user called 'terminate'
// and then we've got term command from the peer as well.
enum {
active,
delimited,
pending,
terminating,
terminated,
double_terminated
} state;
// If true, we receive all the pending inbound messages before
// terminating. If false, we terminate immediately when the peer
// asks us to.
bool delay;
// ID of the protocol to use. This value is not used by the pipe
// itself, rather it's used by the users of the pipe.
int protocol;
// Identity of the writer. Used uniquely by the reader side.
blob_t identity;
// Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (msg_t &msg_);
// Computes appropriate low watermark from the given high watermark.
static int compute_lwm (int hwm_);
// Disable copying.
pipe_t (const pipe_t&);
const pipe_t &operator = (const pipe_t&);
};
}
#endif