diff options
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r-- | src/pipe.cpp | 57 |
1 files changed, 46 insertions, 11 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index da019c1..53dfb21 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -28,8 +28,12 @@ zmq::reader_t::reader_t (object_t *parent_, peer (NULL), hwm (hwm_), lwm (lwm_), + msgs_read (0), endpoint (NULL) { + // Adjust lwm and hwm. + if (lwm == 0 || lwm > hwm) + lwm = hwm; } zmq::reader_t::~reader_t () @@ -73,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) return false; } - // TODO: Adjust the size of the pipe. + msgs_read++; + if (lwm > 0 && msgs_read % lwm == 0) + send_reader_info (peer, msgs_read); return true; } @@ -111,8 +117,14 @@ zmq::writer_t::writer_t (object_t *parent_, peer (NULL), hwm (hwm_), lwm (lwm_), + msgs_read (0), + msgs_written (0), + stalled (false), endpoint (NULL) { + // Adjust lwm and hwm. + if (lwm == 0 || lwm > hwm) + lwm = hwm; } void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) @@ -131,32 +143,41 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_) peer = &pipe->reader; } -bool zmq::writer_t::check_write (uint64_t size_) +bool zmq::writer_t::check_write () { - // TODO: Check whether hwm is exceeded. + if (pipe_full ()) { + stalled = true; + return false; + } return true; } bool zmq::writer_t::write (zmq_msg_t *msg_) { + if (pipe_full ()) { + stalled = true; + return false; + } + pipe->write (*msg_); + msgs_written++; return true; - - // TODO: Adjust size of the pipe. } void zmq::writer_t::rollback () { - while (true) { - zmq_msg_t msg; - if (!pipe->unwrite (&msg)) - break; + zmq_msg_t msg; + + while (pipe->unwrite (&msg)) { zmq_msg_close (&msg); + msgs_written--; } - // TODO: We don't have to inform the reader side of the pipe about - // the event. We'll simply adjust the pipe size and keep calm. + if (stalled && endpoint != NULL && !pipe_full()) { + stalled = false; + endpoint->revive (this); + } } void zmq::writer_t::flush () @@ -179,6 +200,15 @@ void zmq::writer_t::term () pipe->flush (); } +void zmq::writer_t::process_reader_info (uint64_t msgs_read_) +{ + msgs_read = msgs_read_; + if (stalled && endpoint != NULL) { + stalled = false; + endpoint->revive (this); + } +} + void zmq::writer_t::process_pipe_term () { if (endpoint) @@ -189,6 +219,11 @@ void zmq::writer_t::process_pipe_term () send_pipe_term_ack (p); } +bool zmq::writer_t::pipe_full () +{ + return hwm > 0 && msgs_written - msgs_read == hwm; +} + zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_) : reader (reader_parent_, hwm_, lwm_), |