From acf0b0e515515e51ad32ba7a2d147ce703579478 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 22 May 2011 17:26:53 +0200 Subject: Introduces bi-directional pipes So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik --- src/pipe.hpp | 201 +++++++++++++++++++++++++---------------------------------- 1 file changed, 85 insertions(+), 116 deletions(-) (limited to 'src/pipe.hpp') diff --git a/src/pipe.hpp b/src/pipe.hpp index 75b5c47..fcba877 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -22,47 +22,43 @@ #define __ZMQ_PIPE_HPP_INCLUDED__ #include "msg.hpp" -#include "array.hpp" #include "ypipe.hpp" #include "config.hpp" #include "object.hpp" -#include "stdint.hpp" +#include "array.hpp" namespace zmq { - // Creates a pipe. Returns pointer to reader and writer objects. - void create_pipe (object_t *reader_parent_, object_t *writer_parent_, - int hwm_, class reader_t **reader_, class writer_t **writer_); - - // The shutdown mechanism for pipe works as follows: Either endpoint - // (or even both of them) can ask pipe to terminate by calling 'terminate' - // method. Pipe then terminates in asynchronous manner. When the part of - // the shutdown tied to the endpoint is done it triggers 'terminated' - // event. When endpoint processes the event and returns, associated - // reader/writer object is deallocated. - - typedef ypipe_t pipe_t; + // Create a pipepair for bi-directional transfer of messages. + // First HWM is for messages passed from first pipe to the second pipe. + // Second HWM is for messages passed from second pipe to the first pipe. + // Delay specifies whether the pipe receives all the pending messages + // before terminating or whether it terminates straight away. + int pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], + int hwms_ [2], bool delays_ [2]); - struct i_reader_events + struct i_pipe_events { - virtual ~i_reader_events () {} + virtual ~i_pipe_events () {} - virtual void terminated (class reader_t *pipe_) = 0; - virtual void activated (class reader_t *pipe_) = 0; - virtual void delimited (class reader_t *pipe_) = 0; + virtual void read_activated (class pipe_t *pipe_) = 0; + virtual void write_activated (class pipe_t *pipe_) = 0; + virtual void terminated (class pipe_t *pipe_) = 0; }; - class reader_t : public object_t, public array_item_t + class pipe_t : + public object_t, + public array_item_t { - friend void create_pipe (object_t*, object_t*, int, - reader_t**, writer_t**); - friend class writer_t; + // This allows pipepair to create pipe objects. + friend int pipepair (class object_t *parents_ [2], + class pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); public: - // Specifies the object to get events from the reader. - void set_event_sink (i_reader_events *endpoint_); + // Specifies the object to send events to. + void set_event_sink (i_pipe_events *sink_); // Returns true if there is at least one message to read in the pipe. bool check_read (); @@ -70,127 +66,100 @@ namespace zmq // Reads a message to the underlying pipe. bool read (msg_t *msg_); - // Ask pipe to terminate. - void terminate (); - - private: - - reader_t (class object_t *parent_, pipe_t *pipe_, int lwm_); - ~reader_t (); - - // To be called only by writer itself! - void set_writer (class writer_t *writer_); - - // Command handlers. - void process_activate_reader (); - void process_pipe_term_ack (); - - // Returns true if the message is delimiter; false otherwise. - static bool is_delimiter (msg_t &msg_); - - // True, if pipe can be read from. - bool active; - - // The underlying pipe. - pipe_t *pipe; - - // Pipe writer associated with the other side of the pipe. - class writer_t *writer; - - // Low watermark for in-memory storage (in bytes). - int lwm; - - // Number of messages read so far. - uint64_t msgs_read; - - // Sink for the events (either the socket of the session). - i_reader_events *sink; - - // True is 'terminate' method was called or delimiter - // was read from the pipe. - bool terminating; - - reader_t (const reader_t&); - const reader_t &operator = (const reader_t&); - }; - - struct i_writer_events - { - virtual ~i_writer_events () {} - - virtual void terminated (class writer_t *pipe_) = 0; - virtual void activated (class writer_t *pipe_) = 0; - }; - - class writer_t : public object_t, public array_item_t - { - friend void create_pipe (object_t*, object_t*, int, - reader_t**, writer_t**); - - public: - - // Specifies the object to get events from the writer. - void set_event_sink (i_writer_events *endpoint_); - - // Checks whether messages can be written to the pipe. - // If writing the message would cause high watermark - // the function returns false. + // Checks whether messages can be written to the pipe. If writing + // the message would cause high watermark the function returns false. bool check_write (msg_t *msg_); // Writes a message to the underlying pipe. Returns false if the // message cannot be written because high watermark was reached. bool write (msg_t *msg_); - // Remove unfinished part of a message from the pipe. + // Remove unfinished parts of the outbound message from the pipe. void rollback (); // Flush the messages downsteam. void flush (); - // Ask pipe to terminate. + // Ask pipe to terminate. The termination will happen asynchronously + // and user will be notified about actual deallocation by 'terminated' + // event. void terminate (); private: - writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_, - int hwm_); - ~writer_t (); - // Command handlers. - void process_activate_writer (uint64_t msgs_read_); + void process_activate_read (); + void process_activate_write (uint64_t msgs_read_); void process_pipe_term (); + void process_pipe_term_ack (); + + // Type of the underlying lock-free pipe. + typedef ypipe_t upipe_t; - // Tests whether underlying pipe is already full. - bool pipe_full (); + // Constructor is private. Pipe can only be created using + // pipepair function. + pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, + int inhwm_, int outhwm_, bool delay_); - // True, if this object can be written to. - bool active; + // Pipepair uses this function to let us know about + // the peer pipe object. + void set_peer (pipe_t *pipe_); - // The underlying pipe. - pipe_t *pipe; + // Destructor is private. Pipe objects destroy themselves. + ~pipe_t (); - // Pipe reader associated with the other side of the pipe. - reader_t *reader; + // Underlying pipes for both directions. + upipe_t *inpipe; + upipe_t *outpipe; - // High watermark for in-memory storage (in bytes). + // Can the pipe be read from / written to? + bool in_active; + bool out_active; + + // High watermark for the outbound pipe. int hwm; - // Last confirmed number of messages read from the pipe. - // The actual number can be higher. - uint64_t msgs_read; + // Low watermark for the inbound pipe. + int lwm; - // Number of messages we have written so far. + // Number of messages read and written so far. + uint64_t msgs_read; uint64_t msgs_written; - // Sink for the events (either the socket or the session). - i_writer_events *sink; + // Last received peer's msgs_read. The actual number in the peer + // can be higher at the moment. + uint64_t peers_msgs_read; - // True is 'terminate' method was called of 'pipe_term' command - // arrived from the reader. + // The pipe object on the other side of the pipepair. + pipe_t *peer; + + // Sink to send events to. + i_pipe_events *sink; + + // True is 'terminate' method was called or termination request + // was received from the peer. bool terminating; - writer_t (const writer_t&); - const writer_t &operator = (const writer_t&); + // True is we've already got pipe_term command from the peer. + bool term_recvd; + + // True if delimiter was already received from the peer. + bool delimited; + + // If true, we receive all the pending inbound messages before + // terminating. If false, we terminate immediately when the peer + // asks us to. + bool delay; + + // Returns true if the message is delimiter; false otherwise. + static bool is_delimiter (msg_t &msg_); + + // Computes appropriate low watermark from the given high watermark. + static int compute_lwm (int hwm_); + + // Disable copying. + pipe_t (const pipe_t&); + const pipe_t &operator = (const pipe_t&); }; } -- cgit v1.2.3