summaryrefslogtreecommitdiff
path: root/src/pipe.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r--src/pipe.cpp57
1 files changed, 46 insertions, 11 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index da019c1..53dfb21 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -28,8 +28,12 @@ zmq::reader_t::reader_t (object_t *parent_,
peer (NULL),
hwm (hwm_),
lwm (lwm_),
+ msgs_read (0),
endpoint (NULL)
{
+ // Adjust lwm and hwm.
+ if (lwm == 0 || lwm > hwm)
+ lwm = hwm;
}
zmq::reader_t::~reader_t ()
@@ -73,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
return false;
}
- // TODO: Adjust the size of the pipe.
+ msgs_read++;
+ if (lwm > 0 && msgs_read % lwm == 0)
+ send_reader_info (peer, msgs_read);
return true;
}
@@ -111,8 +117,14 @@ zmq::writer_t::writer_t (object_t *parent_,
peer (NULL),
hwm (hwm_),
lwm (lwm_),
+ msgs_read (0),
+ msgs_written (0),
+ stalled (false),
endpoint (NULL)
{
+ // Adjust lwm and hwm.
+ if (lwm == 0 || lwm > hwm)
+ lwm = hwm;
}
void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
@@ -131,32 +143,41 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)
peer = &pipe->reader;
}
-bool zmq::writer_t::check_write (uint64_t size_)
+bool zmq::writer_t::check_write ()
{
- // TODO: Check whether hwm is exceeded.
+ if (pipe_full ()) {
+ stalled = true;
+ return false;
+ }
return true;
}
bool zmq::writer_t::write (zmq_msg_t *msg_)
{
+ if (pipe_full ()) {
+ stalled = true;
+ return false;
+ }
+
pipe->write (*msg_);
+ msgs_written++;
return true;
-
- // TODO: Adjust size of the pipe.
}
void zmq::writer_t::rollback ()
{
- while (true) {
- zmq_msg_t msg;
- if (!pipe->unwrite (&msg))
- break;
+ zmq_msg_t msg;
+
+ while (pipe->unwrite (&msg)) {
zmq_msg_close (&msg);
+ msgs_written--;
}
- // TODO: We don't have to inform the reader side of the pipe about
- // the event. We'll simply adjust the pipe size and keep calm.
+ if (stalled && endpoint != NULL && !pipe_full()) {
+ stalled = false;
+ endpoint->revive (this);
+ }
}
void zmq::writer_t::flush ()
@@ -179,6 +200,15 @@ void zmq::writer_t::term ()
pipe->flush ();
}
+void zmq::writer_t::process_reader_info (uint64_t msgs_read_)
+{
+ msgs_read = msgs_read_;
+ if (stalled && endpoint != NULL) {
+ stalled = false;
+ endpoint->revive (this);
+ }
+}
+
void zmq::writer_t::process_pipe_term ()
{
if (endpoint)
@@ -189,6 +219,11 @@ void zmq::writer_t::process_pipe_term ()
send_pipe_term_ack (p);
}
+bool zmq::writer_t::pipe_full ()
+{
+ return hwm > 0 && msgs_written - msgs_read == hwm;
+}
+
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_) :
reader (reader_parent_, hwm_, lwm_),