summaryrefslogtreecommitdiff
path: root/src/pipe.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pipe.hpp')
-rw-r--r--src/pipe.hpp32
1 files changed, 23 insertions, 9 deletions
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 9f57653..ece678a 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -26,6 +26,7 @@
#include "i_endpoint.hpp"
#include "yarray_item.hpp"
#include "ypipe.hpp"
+#include "msg_store.hpp"
#include "config.hpp"
#include "object.hpp"
@@ -36,8 +37,7 @@ namespace zmq
{
public:
- reader_t (class object_t *parent_,
- uint64_t hwm_, uint64_t lwm_);
+ reader_t (class object_t *parent_, uint64_t lwm_);
~reader_t ();
void set_pipe (class pipe_t *pipe_);
@@ -58,14 +58,16 @@ namespace zmq
void process_revive ();
void process_pipe_term_ack ();
+ // Returns true if the message is delimiter; false otherwise.
+ static bool is_delimiter (zmq_msg_t &msg_);
+
// The underlying pipe.
class pipe_t *pipe;
// Pipe writer associated with the other side of the pipe.
class writer_t *peer;
- // High and low watermarks for in-memory storage (in bytes).
- uint64_t hwm;
+ // Low watermark for in-memory storage (in bytes).
uint64_t lwm;
// Number of messages read so far.
@@ -82,8 +84,7 @@ namespace zmq
{
public:
- writer_t (class object_t *parent_,
- uint64_t hwm_, uint64_t lwm_);
+ writer_t (class object_t *parent_, uint64_t hwm_, int64_t swap_size_);
~writer_t ();
void set_pipe (class pipe_t *pipe_);
@@ -117,15 +118,18 @@ namespace zmq
// Tests whether the pipe is already full.
bool pipe_full ();
+ // Write special message to the pipe so that the reader
+ // can find out we are finished.
+ void write_delimiter ();
+
// The underlying pipe.
class pipe_t *pipe;
// Pipe reader associated with the other side of the pipe.
class reader_t *peer;
- // High and low watermarks for in-memory storage (in bytes).
+ // High watermark for in-memory storage (in bytes).
uint64_t hwm;
- uint64_t lwm;
// Last confirmed number of messages read from the pipe.
// The actual number can be higher.
@@ -134,9 +138,19 @@ namespace zmq
// Number of messages we have written so far.
uint64_t msgs_written;
+ // Pointer to backing store. If NULL, messages are always
+ // kept in main memory.
+ msg_store_t *msg_store;
+
+ bool extra_msg_flag;
+
+ zmq_msg_t extra_msg;
+
// True iff the last attempt to write a message has failed.
bool stalled;
+ bool pending_close;
+
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
@@ -150,7 +164,7 @@ namespace zmq
public:
pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_);
+ uint64_t hwm_, int64_t swap_size_);
~pipe_t ();
reader_t reader;