summaryrefslogtreecommitdiff
path: root/src/pipe.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-22 17:26:53 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-22 17:26:53 +0200
commitacf0b0e515515e51ad32ba7a2d147ce703579478 (patch)
treed2032009cf46c23aa0f677c2216914f718ab968a /src/pipe.hpp
parent9e6b39925603f9e64db08c469bd628d7ef9465de (diff)
Introduces bi-directional pipes
So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/pipe.hpp')
-rw-r--r--src/pipe.hpp201
1 files changed, 85 insertions, 116 deletions
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 75b5c47..fcba877 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -22,47 +22,43 @@
#define __ZMQ_PIPE_HPP_INCLUDED__
#include "msg.hpp"
-#include "array.hpp"
#include "ypipe.hpp"
#include "config.hpp"
#include "object.hpp"
-#include "stdint.hpp"
+#include "array.hpp"
namespace zmq
{
- // Creates a pipe. Returns pointer to reader and writer objects.
- void create_pipe (object_t *reader_parent_, object_t *writer_parent_,
- int hwm_, class reader_t **reader_, class writer_t **writer_);
-
- // The shutdown mechanism for pipe works as follows: Either endpoint
- // (or even both of them) can ask pipe to terminate by calling 'terminate'
- // method. Pipe then terminates in asynchronous manner. When the part of
- // the shutdown tied to the endpoint is done it triggers 'terminated'
- // event. When endpoint processes the event and returns, associated
- // reader/writer object is deallocated.
-
- typedef ypipe_t <msg_t, message_pipe_granularity> pipe_t;
+ // Create a pipepair for bi-directional transfer of messages.
+ // First HWM is for messages passed from first pipe to the second pipe.
+ // Second HWM is for messages passed from second pipe to the first pipe.
+ // Delay specifies whether the pipe receives all the pending messages
+ // before terminating or whether it terminates straight away.
+ int pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
+ int hwms_ [2], bool delays_ [2]);
- struct i_reader_events
+ struct i_pipe_events
{
- virtual ~i_reader_events () {}
+ virtual ~i_pipe_events () {}
- virtual void terminated (class reader_t *pipe_) = 0;
- virtual void activated (class reader_t *pipe_) = 0;
- virtual void delimited (class reader_t *pipe_) = 0;
+ virtual void read_activated (class pipe_t *pipe_) = 0;
+ virtual void write_activated (class pipe_t *pipe_) = 0;
+ virtual void terminated (class pipe_t *pipe_) = 0;
};
- class reader_t : public object_t, public array_item_t
+ class pipe_t :
+ public object_t,
+ public array_item_t
{
- friend void create_pipe (object_t*, object_t*, int,
- reader_t**, writer_t**);
- friend class writer_t;
+ // This allows pipepair to create pipe objects.
+ friend int pipepair (class object_t *parents_ [2],
+ class pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]);
public:
- // Specifies the object to get events from the reader.
- void set_event_sink (i_reader_events *endpoint_);
+ // Specifies the object to send events to.
+ void set_event_sink (i_pipe_events *sink_);
// Returns true if there is at least one message to read in the pipe.
bool check_read ();
@@ -70,127 +66,100 @@ namespace zmq
// Reads a message to the underlying pipe.
bool read (msg_t *msg_);
- // Ask pipe to terminate.
- void terminate ();
-
- private:
-
- reader_t (class object_t *parent_, pipe_t *pipe_, int lwm_);
- ~reader_t ();
-
- // To be called only by writer itself!
- void set_writer (class writer_t *writer_);
-
- // Command handlers.
- void process_activate_reader ();
- void process_pipe_term_ack ();
-
- // Returns true if the message is delimiter; false otherwise.
- static bool is_delimiter (msg_t &msg_);
-
- // True, if pipe can be read from.
- bool active;
-
- // The underlying pipe.
- pipe_t *pipe;
-
- // Pipe writer associated with the other side of the pipe.
- class writer_t *writer;
-
- // Low watermark for in-memory storage (in bytes).
- int lwm;
-
- // Number of messages read so far.
- uint64_t msgs_read;
-
- // Sink for the events (either the socket of the session).
- i_reader_events *sink;
-
- // True is 'terminate' method was called or delimiter
- // was read from the pipe.
- bool terminating;
-
- reader_t (const reader_t&);
- const reader_t &operator = (const reader_t&);
- };
-
- struct i_writer_events
- {
- virtual ~i_writer_events () {}
-
- virtual void terminated (class writer_t *pipe_) = 0;
- virtual void activated (class writer_t *pipe_) = 0;
- };
-
- class writer_t : public object_t, public array_item_t
- {
- friend void create_pipe (object_t*, object_t*, int,
- reader_t**, writer_t**);
-
- public:
-
- // Specifies the object to get events from the writer.
- void set_event_sink (i_writer_events *endpoint_);
-
- // Checks whether messages can be written to the pipe.
- // If writing the message would cause high watermark
- // the function returns false.
+ // Checks whether messages can be written to the pipe. If writing
+ // the message would cause high watermark the function returns false.
bool check_write (msg_t *msg_);
// Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached.
bool write (msg_t *msg_);
- // Remove unfinished part of a message from the pipe.
+ // Remove unfinished parts of the outbound message from the pipe.
void rollback ();
// Flush the messages downsteam.
void flush ();
- // Ask pipe to terminate.
+ // Ask pipe to terminate. The termination will happen asynchronously
+ // and user will be notified about actual deallocation by 'terminated'
+ // event.
void terminate ();
private:
- writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_,
- int hwm_);
- ~writer_t ();
-
// Command handlers.
- void process_activate_writer (uint64_t msgs_read_);
+ void process_activate_read ();
+ void process_activate_write (uint64_t msgs_read_);
void process_pipe_term ();
+ void process_pipe_term_ack ();
+
+ // Type of the underlying lock-free pipe.
+ typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
- // Tests whether underlying pipe is already full.
- bool pipe_full ();
+ // Constructor is private. Pipe can only be created using
+ // pipepair function.
+ pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
+ int inhwm_, int outhwm_, bool delay_);
- // True, if this object can be written to.
- bool active;
+ // Pipepair uses this function to let us know about
+ // the peer pipe object.
+ void set_peer (pipe_t *pipe_);
- // The underlying pipe.
- pipe_t *pipe;
+ // Destructor is private. Pipe objects destroy themselves.
+ ~pipe_t ();
- // Pipe reader associated with the other side of the pipe.
- reader_t *reader;
+ // Underlying pipes for both directions.
+ upipe_t *inpipe;
+ upipe_t *outpipe;
- // High watermark for in-memory storage (in bytes).
+ // Can the pipe be read from / written to?
+ bool in_active;
+ bool out_active;
+
+ // High watermark for the outbound pipe.
int hwm;
- // Last confirmed number of messages read from the pipe.
- // The actual number can be higher.
- uint64_t msgs_read;
+ // Low watermark for the inbound pipe.
+ int lwm;
- // Number of messages we have written so far.
+ // Number of messages read and written so far.
+ uint64_t msgs_read;
uint64_t msgs_written;
- // Sink for the events (either the socket or the session).
- i_writer_events *sink;
+ // Last received peer's msgs_read. The actual number in the peer
+ // can be higher at the moment.
+ uint64_t peers_msgs_read;
- // True is 'terminate' method was called of 'pipe_term' command
- // arrived from the reader.
+ // The pipe object on the other side of the pipepair.
+ pipe_t *peer;
+
+ // Sink to send events to.
+ i_pipe_events *sink;
+
+ // True is 'terminate' method was called or termination request
+ // was received from the peer.
bool terminating;
- writer_t (const writer_t&);
- const writer_t &operator = (const writer_t&);
+ // True is we've already got pipe_term command from the peer.
+ bool term_recvd;
+
+ // True if delimiter was already received from the peer.
+ bool delimited;
+
+ // If true, we receive all the pending inbound messages before
+ // terminating. If false, we terminate immediately when the peer
+ // asks us to.
+ bool delay;
+
+ // Returns true if the message is delimiter; false otherwise.
+ static bool is_delimiter (msg_t &msg_);
+
+ // Computes appropriate low watermark from the given high watermark.
+ static int compute_lwm (int hwm_);
+
+ // Disable copying.
+ pipe_t (const pipe_t&);
+ const pipe_t &operator = (const pipe_t&);
};
}