summaryrefslogtreecommitdiff
path: root/src/pipe.cpp
diff options
context:
space:
mode:
authorMartin Hurton <hurtonm@gmail.com>2010-06-21 15:06:51 +0200
committerMartin Hurton <hurtonm@gmail.com>2010-06-21 15:06:51 +0200
commitfca2e8e8cc30bcd134839f6d0f5f9963323dad2b (patch)
treefceade0043c87c74fec8b6ad3de00b1090feaa9c /src/pipe.cpp
parent10c28c1fc2f06c93e12a7c60f79a315cec7c5a52 (diff)
Add SWAP support
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r--src/pipe.cpp114
1 files changed, 90 insertions, 24 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index ff64716..b5c656d 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -21,20 +21,14 @@
#include "pipe.hpp"
-zmq::reader_t::reader_t (object_t *parent_,
- uint64_t hwm_, uint64_t lwm_) :
+zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) :
object_t (parent_),
pipe (NULL),
peer (NULL),
- hwm (hwm_),
lwm (lwm_),
msgs_read (0),
endpoint (NULL)
-{
- // Adjust lwm and hwm.
- if (lwm == 0 || lwm > hwm)
- lwm = hwm;
-}
+{}
zmq::reader_t::~reader_t ()
{
@@ -113,20 +107,28 @@ void zmq::reader_t::process_pipe_term_ack ()
}
zmq::writer_t::writer_t (object_t *parent_,
- uint64_t hwm_, uint64_t lwm_) :
+ uint64_t hwm_, int64_t swap_size_) :
object_t (parent_),
pipe (NULL),
peer (NULL),
hwm (hwm_),
- lwm (lwm_),
msgs_read (0),
msgs_written (0),
+ msg_store (NULL),
+ extra_msg_flag (false),
stalled (false),
+ pending_close (false),
endpoint (NULL)
{
- // Adjust lwm and hwm.
- if (lwm == 0 || lwm > hwm)
- lwm = hwm;
+ if (swap_size_ > 0) {
+ msg_store = new (std::nothrow) msg_store_t (swap_size_);
+ if (msg_store != NULL) {
+ if (msg_store->init () < 0) {
+ delete msg_store;
+ msg_store = NULL;
+ }
+ }
+ }
}
void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
@@ -136,6 +138,10 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
zmq::writer_t::~writer_t ()
{
+ if (extra_msg_flag)
+ zmq_msg_close (&extra_msg);
+
+ delete msg_store;
}
void zmq::writer_t::set_pipe (pipe_t *pipe_)
@@ -147,7 +153,7 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)
bool zmq::writer_t::check_write ()
{
- if (pipe_full ()) {
+ if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) {
stalled = true;
return false;
}
@@ -157,28 +163,45 @@ bool zmq::writer_t::check_write ()
bool zmq::writer_t::write (zmq_msg_t *msg_)
{
- if (pipe_full ()) {
- stalled = true;
+ if (!check_write ())
return false;
+
+ if (pipe_full ()) {
+ if (msg_store->store (msg_)) {
+ if (!(msg_->flags & ZMQ_MSG_MORE))
+ msg_store->commit ();
+ } else {
+ extra_msg = *msg_;
+ extra_msg_flag = true;
+ }
+ }
+ else {
+ pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
+ if (!(msg_->flags & ZMQ_MSG_MORE))
+ msgs_written++;
}
- pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
- if (!(msg_->flags & ZMQ_MSG_MORE))
- msgs_written++;
return true;
}
void zmq::writer_t::rollback ()
{
- zmq_msg_t msg;
+ if (extra_msg_flag && extra_msg.flags & ZMQ_MSG_MORE) {
+ zmq_msg_close (&extra_msg);
+ extra_msg_flag = false;
+ }
+
+ if (msg_store != NULL)
+ msg_store->rollback ();
+ zmq_msg_t msg;
// Remove all incomplete messages from the pipe.
while (pipe->unwrite (&msg)) {
zmq_assert (msg.flags & ZMQ_MSG_MORE);
zmq_msg_close (&msg);
}
- if (stalled && endpoint != NULL && !pipe_full()) {
+ if (stalled && endpoint != NULL && check_write ()) {
stalled = false;
endpoint->revive (this);
}
@@ -197,6 +220,14 @@ void zmq::writer_t::term ()
// Rollback any unfinished messages.
rollback ();
+ if (msg_store == NULL || (msg_store->empty () && !extra_msg_flag))
+ write_delimiter ();
+ else
+ pending_close = true;
+}
+
+void zmq::writer_t::write_delimiter ()
+{
// Push delimiter into the pipe.
// Trick the compiler to belive that the tag is a valid pointer.
zmq_msg_t msg;
@@ -209,7 +240,42 @@ void zmq::writer_t::term ()
void zmq::writer_t::process_reader_info (uint64_t msgs_read_)
{
+ zmq_msg_t msg;
+
msgs_read = msgs_read_;
+ if (msg_store) {
+
+ // Move messages from backing store into pipe.
+ while (!pipe_full () && !msg_store->empty ()) {
+ msg_store->fetch(&msg);
+ // Write message into the pipe.
+ pipe->write (msg, msg.flags & ZMQ_MSG_MORE);
+ if (!(msg.flags & ZMQ_MSG_MORE))
+ msgs_written++;
+ }
+
+ if (extra_msg_flag) {
+ if (!pipe_full ()) {
+ pipe->write (extra_msg, extra_msg.flags & ZMQ_MSG_MORE);
+ if (!(extra_msg.flags & ZMQ_MSG_MORE))
+ msgs_written++;
+ extra_msg_flag = false;
+ }
+ else if (msg_store->store (&extra_msg)) {
+ if (!(extra_msg.flags & ZMQ_MSG_MORE))
+ msg_store->commit ();
+ extra_msg_flag = false;
+ }
+ }
+
+ if (pending_close && msg_store->empty () && !extra_msg_flag) {
+ write_delimiter ();
+ pending_close = false;
+ }
+
+ flush ();
+ }
+
if (stalled && endpoint != NULL) {
stalled = false;
endpoint->revive (this);
@@ -232,9 +298,9 @@ bool zmq::writer_t::pipe_full ()
}
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_) :
- reader (reader_parent_, hwm_, compute_lwm (hwm_)),
- writer (writer_parent_, hwm_, compute_lwm (hwm_))
+ uint64_t hwm_, int64_t swap_size_) :
+ reader (reader_parent_, compute_lwm (hwm_)),
+ writer (writer_parent_, hwm_, swap_size_)
{
reader.set_pipe (this);
writer.set_pipe (this);