From 99c5d9283622a0b37ee80f83ff4875c059fc5990 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 27 Aug 2009 10:54:28 +0200 Subject: pipes added --- src/pipe.hpp | 102 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) (limited to 'src/pipe.hpp') diff --git a/src/pipe.hpp b/src/pipe.hpp index 28e4b4d..d48fc47 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -22,15 +22,117 @@ #include "../include/zmq.h" +#include "stdint.hpp" +#include "i_endpoint.hpp" #include "ypipe.hpp" #include "config.hpp" +#include "object.hpp" namespace zmq { + class reader_t : public object_t + { + public: + + reader_t (class object_t *parent_, class pipe_t *pipe_, + uint64_t hwm_, uint64_t lwm_); + ~reader_t (); + + // Reads a message to the underlying pipe. + bool read (struct zmq_msg_t *msg_); + + // Mnaipulation of index of the pipe. + void set_endpoint (i_endpoint *endpoint_); + void set_index (int index_); + int get_index (); + + private: + + // Command handlers. + void process_revive (); + + // The underlying pipe. + class pipe_t *pipe; + + // Pipe writer associated with the other side of the pipe. + class object_t *peer; + + // High and low watermarks for in-memory storage (in bytes). + uint64_t hwm; + uint64_t lwm; + + // Positions of head and tail of the pipe (in bytes). + uint64_t head; + uint64_t tail; + uint64_t last_sent_head; + + // Index of the pipe in the socket's list of inbound pipes. + int index; + + // Endpoint (either session or socket) the pipe is attached to. + i_endpoint *endpoint; + + reader_t (const reader_t&); + void operator = (const reader_t&); + }; + + class writer_t : public object_t + { + public: + + writer_t (class object_t *parent_, class pipe_t *pipe_, + uint64_t hwm_, uint64_t lwm_); + ~writer_t (); + + // Checks whether message with specified size can be written to the + // pipe. If writing the message would cause high watermark to be + // exceeded, the function returns false. + bool check_write (uint64_t size_); + + // Writes a message to the underlying pipe. Returns false if the + // message cannot be written because high watermark was reached. + bool write (struct zmq_msg_t *msg_); + + // Flush the messages downsteam. + void flush (); + + private: + + // The underlying pipe. + class pipe_t *pipe; + + // Pipe reader associated with the other side of the pipe. + class object_t *peer; + + // High and low watermarks for in-memory storage (in bytes). + uint64_t hwm; + uint64_t lwm; + + // Positions of head and tail of the pipe (in bytes). + uint64_t head; + uint64_t tail; + + writer_t (const writer_t&); + void operator = (const writer_t&); + }; + // Message pipe. class pipe_t : public ypipe_t { + public: + + pipe_t (object_t *reader_parent_, object_t *writer_parent_, + uint64_t hwm_, uint64_t lwm_); + ~pipe_t (); + + reader_t reader; + writer_t writer; + + private: + + pipe_t (const pipe_t&); + void operator = (const pipe_t&); }; } -- cgit v1.2.3