From acf0b0e515515e51ad32ba7a2d147ce703579478 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 22 May 2011 17:26:53 +0200 Subject: Introduces bi-directional pipes So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik --- src/pipe.cpp | 335 ++++++++++++++++++++++++++++------------------------------- 1 file changed, 161 insertions(+), 174 deletions(-) (limited to 'src/pipe.cpp') diff --git a/src/pipe.cpp b/src/pipe.cpp index 36dc808..fb03042 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -19,100 +19,123 @@ */ #include +#include #include "pipe.hpp" -#include "likely.hpp" +#include "err.hpp" -zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, int lwm_) : +int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], + int hwms_ [2], bool delays_ [2]) +{ + // Creates two pipe objects. These objects are connected by two ypipes, + // each to pass messages in one direction. + + pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t (); + alloc_assert (upipe1); + pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t (); + alloc_assert (upipe2); + + pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, + hwms_ [1], hwms_ [0], delays_ [0]); + alloc_assert (pipes_ [0]); + pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, + hwms_ [0], hwms_ [1], delays_ [1]); + alloc_assert (pipes_ [1]); + + pipes_ [0]->set_peer (pipes_ [1]); + pipes_ [1]->set_peer (pipes_ [0]); + + return 0; +} + +zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, + int inhwm_, int outhwm_, bool delay_) : object_t (parent_), - active (true), - pipe (pipe_), - writer (NULL), - lwm (lwm_), + inpipe (inpipe_), + outpipe (outpipe_), + in_active (true), + out_active (true), + hwm (outhwm_), + lwm (compute_lwm (inhwm_)), msgs_read (0), + msgs_written (0), + peers_msgs_read (0), + peer (NULL), sink (NULL), - terminating (false) + terminating (false), + term_recvd (false), + delimited (false), + delay (delay_) { - // Note that writer is not set here. Writer will inform reader about its - // address once it is created (via set_writer method). } -void zmq::reader_t::set_writer (writer_t *writer_) +zmq::pipe_t::~pipe_t () { - zmq_assert (!writer); - writer = writer_; } -zmq::reader_t::~reader_t () +void zmq::pipe_t::set_peer (pipe_t *peer_) { - // Pipe as such is owned and deallocated by reader object. - // The point is that reader processes the last step of terminal - // handshaking (term_ack). - zmq_assert (pipe); - - // First delete all the unread messages in the pipe. We have to do it by - // hand because msg_t doesn't have automatic destructor. - msg_t msg; - while (pipe->read (&msg)) { - int rc = msg.close (); - errno_assert (rc == 0); - } - - delete pipe; + // Peer can be set once only. + zmq_assert (!peer); + peer = peer_; } -void zmq::reader_t::set_event_sink (i_reader_events *sink_) +void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) { + // Sink can be set once only. zmq_assert (!sink); sink = sink_; } -bool zmq::reader_t::is_delimiter (msg_t &msg_) -{ - return msg_.is_delimiter (); -} - -bool zmq::reader_t::check_read () +bool zmq::pipe_t::check_read () { - if (!active) + if (unlikely (!in_active)) return false; // Check if there's an item in the pipe. - if (!pipe->check_read ()) { - active = false; + if (!inpipe->check_read ()) { + in_active = false; return false; } // If the next item in the pipe is message delimiter, - // initiate its termination. - if (pipe->probe (is_delimiter)) { + // initiate termination process. + if (inpipe->probe (is_delimiter)) { msg_t msg; - bool ok = pipe->read (&msg); + bool ok = inpipe->read (&msg); zmq_assert (ok); - if (sink) - sink->delimited (this); - terminate (); + delimited = true; + + // If pipe_term was already received but wasn't processed because + // of pending messages, we can ack it now. + if (terminating) + send_pipe_term_ack (peer); + return false; } return true; } -bool zmq::reader_t::read (msg_t *msg_) +bool zmq::pipe_t::read (msg_t *msg_) { - if (!active) + if (unlikely (!in_active)) return false; - if (!pipe->read (msg_)) { - active = false; + if (!inpipe->read (msg_)) { + in_active = false; return false; } // If delimiter was read, start termination process of the pipe. if (msg_->is_delimiter ()) { - if (sink) - sink->delimited (this); - terminate (); + delimited = true; + + // If pipe_term was already received but wasn't processed because + // of pending messages, we can ack it now. + if (terminating) + send_pipe_term_ack (peer); + return false; } @@ -120,175 +143,148 @@ bool zmq::reader_t::read (msg_t *msg_) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) - send_activate_writer (writer, msgs_read); + send_activate_write (peer, msgs_read); return true; } -void zmq::reader_t::terminate () +bool zmq::pipe_t::check_write (msg_t *msg_) { - // If termination was already started by the peer, do nothing. - if (terminating) - return; + if (unlikely (!out_active)) + return false; - active = false; - terminating = true; - send_pipe_term (writer); -} + bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm); -void zmq::reader_t::process_activate_reader () -{ - // Forward the event to the sink (either socket or session). - active = true; - sink->activated (this); + if (unlikely (full)) { + out_active = false; + return false; + } + + return true; } -void zmq::reader_t::process_pipe_term_ack () +bool zmq::pipe_t::write (msg_t *msg_) { - // At this point writer may already be deallocated. - // For safety's sake drop the reference to it. - writer = NULL; + if (unlikely (!check_write (msg_))) + return false; - // Notify owner about the termination. - zmq_assert (sink); - sink->terminated (this); + outpipe->write (*msg_, msg_->flags () & msg_t::more); + if (!(msg_->flags () & msg_t::more)) + msgs_written++; - // Deallocate resources. - delete this; + return true; } -zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, - int hwm_) : - object_t (parent_), - active (true), - pipe (pipe_), - reader (reader_), - hwm (hwm_), - msgs_read (0), - msgs_written (0), - sink (NULL), - terminating (false) +void zmq::pipe_t::rollback () { - // Inform reader about the writer. - reader->set_writer (this); + // Remove incomplete message from the outbound pipe. + msg_t msg; + while (outpipe->unwrite (&msg)) { + zmq_assert (msg.flags () & msg_t::more); + int rc = msg.close (); + errno_assert (rc == 0); + } } -zmq::writer_t::~writer_t () +void zmq::pipe_t::flush () { + if (!outpipe->flush ()) + send_activate_read (peer); } -void zmq::writer_t::set_event_sink (i_writer_events *sink_) +void zmq::pipe_t::process_activate_read () { - zmq_assert (!sink); - sink = sink_; + if (!in_active && !terminating) { + in_active = true; + sink->read_activated (this); + } } -bool zmq::writer_t::check_write (msg_t *msg_) +void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) { - // We've already checked and there's no space free for the new message. - // There's no point in checking once again. - if (unlikely (!active)) - return false; + // Remember the peers's message sequence number. + peers_msgs_read = msgs_read_; - if (unlikely (pipe_full ())) { - active = false; - return false; + if (!out_active && !terminating) { + out_active = true; + sink->write_activated (this); } - - return true; } -bool zmq::writer_t::write (msg_t *msg_) +void zmq::pipe_t::process_pipe_term () { - if (unlikely (!check_write (msg_))) - return false; - - pipe->write (*msg_, msg_->flags () & msg_t::more); - if (!(msg_->flags () & msg_t::more)) - msgs_written++; - - return true; + term_recvd = true; + + // We can proceed with the termination if one of the following is true: + // 1. User asked this side of pipe to terminate already. + // 2. Waiting for pending messages in not required. + // 3. Delimiter was already received. + if (terminating || !delay || delimited) { + terminating = true; + send_pipe_term_ack (peer); + } } -void zmq::writer_t::rollback () +void zmq::pipe_t::process_pipe_term_ack () { - // Remove incomplete message from the pipe. + // Notify the user that all the references to the pipe should be dropped. + zmq_assert (sink); + sink->terminated (this); + + // If the peer haven't asked for the termination itself, we have to + // ack the ack, so that it can deallocate properly. + if (!term_recvd) + send_pipe_term_ack (peer); + + // We'll deallocate the inbound pipe, the peer will deallocate the outbound + // pipe (which is an inbound pipe from its point of view). + // First, delete all the unread messages in the pipe. We have to do it by + // hand because msg_t doesn't have automatic destructor. Then deallocate + // the ypipe itself. msg_t msg; - while (pipe->unwrite (&msg)) { - zmq_assert (msg.flags () & msg_t::more); - int rc = msg.close (); - errno_assert (rc == 0); + while (inpipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); } -} + delete inpipe; -void zmq::writer_t::flush () -{ - if (!pipe->flush ()) - send_activate_reader (reader); + // Deallocate the pipe object + delete this; } -void zmq::writer_t::terminate () +void zmq::pipe_t::terminate () { // Prevent double termination. if (terminating) return; terminating = true; - // Mark the pipe as not available for writing. - active = false; + // Stop inbound and outbound flow of messages. + in_active = false; + out_active = false; - // Rollback any unfinished messages. + // Rollback any unfinished outbound messages. rollback (); - // Push delimiter into the pipe. Trick the compiler to belive that - // the tag is a valid pointer. Note that watermarks are not checked - // thus the delimiter can be written even though the pipe is full. + // Push delimiter into the outbound pipe. Note that watermarks are not + // checked thus the delimiter can be written even though the pipe is full. msg_t msg; msg.init_delimiter (); - pipe->write (msg, false); + outpipe->write (msg, false); flush (); -} - -void zmq::writer_t::process_activate_writer (uint64_t msgs_read_) -{ - // Store the reader's message sequence number. - msgs_read = msgs_read_; - - // If the writer was non-active before, let's make it active - // (available for writing messages to). - if (!active && !terminating) { - active = true; - zmq_assert (sink); - sink->activated (this); - } -} - -void zmq::writer_t::process_pipe_term () -{ - send_pipe_term_ack (reader); - - // The above command allows reader to deallocate itself and the pipe. - // For safety's sake we'll drop the pointers here. - reader = NULL; - pipe = NULL; - // Notify owner about the termination. - zmq_assert (sink); - sink->terminated (this); - - // Deallocate the resources. - delete this; + // Start the termination handshaking. + send_pipe_term (peer); } -bool zmq::writer_t::pipe_full () +bool zmq::pipe_t::is_delimiter (msg_t &msg_) { - return hwm > 0 && msgs_written - msgs_read == uint64_t (hwm); + return msg_.is_delimiter (); } -void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, - int hwm_, reader_t **reader_, writer_t **writer_) +int zmq::pipe_t::compute_lwm (int hwm_) { - // First compute the low water mark. Following point should be taken + // Compute the low water mark. Following point should be taken // into consideration: // // 1. LWM has to be less than HWM. @@ -308,17 +304,8 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, // That done, we still we have to account for the cases where // HWM < max_wm_delta thus driving LWM to negative numbers. // Let's make LWM 1/2 of HWM in such cases. - int lwm = (hwm_ > max_wm_delta * 2) ? + int result = (hwm_ > max_wm_delta * 2) ? hwm_ - max_wm_delta : (hwm_ + 1) / 2; - // Create all three objects pipe consists of: the pipe per se, reader and - // writer. The pipe will be handled by reader and writer, its never passed - // to the user. Reader and writer are returned to the user. - pipe_t *pipe = new (std::nothrow) pipe_t (); - alloc_assert (pipe); - *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm); - alloc_assert (*reader_); - *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_, - hwm_); - alloc_assert (*writer_); + return result; } -- cgit v1.2.3