summaryrefslogtreecommitdiff
path: root/src/pipe.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r--src/pipe.cpp146
1 files changed, 114 insertions, 32 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 1df64e9..200beb0 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -21,20 +21,14 @@
#include "pipe.hpp"
-zmq::reader_t::reader_t (object_t *parent_,
- uint64_t hwm_, uint64_t lwm_) :
+zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) :
object_t (parent_),
pipe (NULL),
peer (NULL),
- hwm (hwm_),
lwm (lwm_),
msgs_read (0),
endpoint (NULL)
-{
- // Adjust lwm and hwm.
- if (lwm == 0 || lwm > hwm)
- lwm = hwm;
-}
+{}
zmq::reader_t::~reader_t ()
{
@@ -50,15 +44,32 @@ void zmq::reader_t::set_pipe (pipe_t *pipe_)
register_pipe (pipe);
}
+bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
+{
+ unsigned char *offset = 0;
+
+ return msg_.content == (void*) (offset + ZMQ_DELIMITER);
+}
+
bool zmq::reader_t::check_read ()
{
// Check if there's an item in the pipe.
- if (pipe->check_read ())
- return true;
-
// If not, deactivate the pipe.
- endpoint->kill (this);
- return false;
+ if (!pipe->check_read ()) {
+ endpoint->kill (this);
+ return false;
+ }
+
+ // If the next item in the pipe is message delimiter,
+ // initiate its termination.
+ if (pipe->probe (is_delimiter)) {
+ if (endpoint)
+ endpoint->detach_inpipe (this);
+ term ();
+ return false;
+ }
+
+ return true;
}
bool zmq::reader_t::read (zmq_msg_t *msg_)
@@ -113,20 +124,28 @@ void zmq::reader_t::process_pipe_term_ack ()
}
zmq::writer_t::writer_t (object_t *parent_,
- uint64_t hwm_, uint64_t lwm_) :
+ uint64_t hwm_, int64_t swap_size_) :
object_t (parent_),
pipe (NULL),
peer (NULL),
hwm (hwm_),
- lwm (lwm_),
msgs_read (0),
msgs_written (0),
+ msg_store (NULL),
+ extra_msg_flag (false),
stalled (false),
+ pending_close (false),
endpoint (NULL)
{
- // Adjust lwm and hwm.
- if (lwm == 0 || lwm > hwm)
- lwm = hwm;
+ if (swap_size_ > 0) {
+ msg_store = new (std::nothrow) msg_store_t (swap_size_);
+ if (msg_store != NULL) {
+ if (msg_store->init () < 0) {
+ delete msg_store;
+ msg_store = NULL;
+ }
+ }
+ }
}
void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
@@ -136,6 +155,10 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
zmq::writer_t::~writer_t ()
{
+ if (extra_msg_flag)
+ zmq_msg_close (&extra_msg);
+
+ delete msg_store;
}
void zmq::writer_t::set_pipe (pipe_t *pipe_)
@@ -147,7 +170,7 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)
bool zmq::writer_t::check_write ()
{
- if (pipe_full ()) {
+ if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) {
stalled = true;
return false;
}
@@ -157,29 +180,45 @@ bool zmq::writer_t::check_write ()
bool zmq::writer_t::write (zmq_msg_t *msg_)
{
- if (pipe_full ()) {
- stalled = true;
+ if (!check_write ())
return false;
+
+ if (pipe_full ()) {
+ if (msg_store->store (msg_)) {
+ if (!(msg_->flags & ZMQ_MSG_MORE))
+ msg_store->commit ();
+ } else {
+ extra_msg = *msg_;
+ extra_msg_flag = true;
+ }
+ }
+ else {
+ pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
+ if (!(msg_->flags & ZMQ_MSG_MORE))
+ msgs_written++;
}
- pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
- if (!(msg_->flags & ZMQ_MSG_MORE))
- msgs_written++;
return true;
}
void zmq::writer_t::rollback ()
{
- zmq_msg_t msg;
+ if (extra_msg_flag && extra_msg.flags & ZMQ_MSG_MORE) {
+ zmq_msg_close (&extra_msg);
+ extra_msg_flag = false;
+ }
+ if (msg_store != NULL)
+ msg_store->rollback ();
+
+ zmq_msg_t msg;
// Remove all incomplete messages from the pipe.
while (pipe->unwrite (&msg)) {
zmq_assert (msg.flags & ZMQ_MSG_MORE);
zmq_msg_close (&msg);
- msgs_written--;
}
- if (stalled && endpoint != NULL && !pipe_full()) {
+ if (stalled && endpoint != NULL && check_write ()) {
stalled = false;
endpoint->revive (this);
}
@@ -198,6 +237,14 @@ void zmq::writer_t::term ()
// Rollback any unfinished messages.
rollback ();
+ if (msg_store == NULL || (msg_store->empty () && !extra_msg_flag))
+ write_delimiter ();
+ else
+ pending_close = true;
+}
+
+void zmq::writer_t::write_delimiter ()
+{
// Push delimiter into the pipe.
// Trick the compiler to belive that the tag is a valid pointer.
zmq_msg_t msg;
@@ -205,12 +252,47 @@ void zmq::writer_t::term ()
msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.flags = 0;
pipe->write (msg, false);
- pipe->flush ();
+ flush ();
}
void zmq::writer_t::process_reader_info (uint64_t msgs_read_)
{
+ zmq_msg_t msg;
+
msgs_read = msgs_read_;
+ if (msg_store) {
+
+ // Move messages from backing store into pipe.
+ while (!pipe_full () && !msg_store->empty ()) {
+ msg_store->fetch(&msg);
+ // Write message into the pipe.
+ pipe->write (msg, msg.flags & ZMQ_MSG_MORE);
+ if (!(msg.flags & ZMQ_MSG_MORE))
+ msgs_written++;
+ }
+
+ if (extra_msg_flag) {
+ if (!pipe_full ()) {
+ pipe->write (extra_msg, extra_msg.flags & ZMQ_MSG_MORE);
+ if (!(extra_msg.flags & ZMQ_MSG_MORE))
+ msgs_written++;
+ extra_msg_flag = false;
+ }
+ else if (msg_store->store (&extra_msg)) {
+ if (!(extra_msg.flags & ZMQ_MSG_MORE))
+ msg_store->commit ();
+ extra_msg_flag = false;
+ }
+ }
+
+ if (pending_close && msg_store->empty () && !extra_msg_flag) {
+ write_delimiter ();
+ pending_close = false;
+ }
+
+ flush ();
+ }
+
if (stalled && endpoint != NULL) {
stalled = false;
endpoint->revive (this);
@@ -233,9 +315,9 @@ bool zmq::writer_t::pipe_full ()
}
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_) :
- reader (reader_parent_, hwm_, compute_lwm (hwm_)),
- writer (writer_parent_, hwm_, compute_lwm (hwm_))
+ uint64_t hwm_, int64_t swap_size_) :
+ reader (reader_parent_, compute_lwm (hwm_)),
+ writer (writer_parent_, hwm_, swap_size_)
{
reader.set_pipe (this);
writer.set_pipe (this);
@@ -276,6 +358,6 @@ uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_)
if (hwm_ > max_wm_delta * 2)
return hwm_ - max_wm_delta;
else
- return hwm_ / 2;
+ return (hwm_ + 1) / 2;
}