diff options
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r-- | src/pipe.cpp | 420 |
1 files changed, 232 insertions, 188 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 200beb0..a60e519 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -4,44 +4,68 @@ This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> + #include "../include/zmq.h" #include "pipe.hpp" +#include "likely.hpp" -zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) : +zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, + uint64_t lwm_) : object_t (parent_), - pipe (NULL), - peer (NULL), + active (true), + pipe (pipe_), + writer (NULL), lwm (lwm_), msgs_read (0), - endpoint (NULL) -{} + sink (NULL), + terminating (false) +{ + // Note that writer is not set here. Writer will inform reader about its + // address once it is created (via set_writer method). +} + +void zmq::reader_t::set_writer (writer_t *writer_) +{ + zmq_assert (!writer); + writer = writer_; +} zmq::reader_t::~reader_t () { - if (pipe) - unregister_pipe (pipe); + // Pipe as such is owned and deallocated by reader object. + // The point is that reader processes the last step of terminal + // handshaking (term_ack). + zmq_assert (pipe); + + // First delete all the unread messages in the pipe. We have to do it by + // hand because zmq_msg_t is a POD, not a class, so there's no associated + // destructor. + zmq_msg_t msg; + while (pipe->read (&msg)) + zmq_msg_close (&msg); + + delete pipe; } -void zmq::reader_t::set_pipe (pipe_t *pipe_) +void zmq::reader_t::set_event_sink (i_reader_events *sink_) { - zmq_assert (!pipe); - pipe = pipe_; - peer = &pipe->writer; - register_pipe (pipe); + zmq_assert (!sink); + sink = sink_; } bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) @@ -53,19 +77,24 @@ bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) bool zmq::reader_t::check_read () { + if (!active) + return false; + // Check if there's an item in the pipe. - // If not, deactivate the pipe. if (!pipe->check_read ()) { - endpoint->kill (this); + active = false; return false; } // If the next item in the pipe is message delimiter, // initiate its termination. if (pipe->probe (is_delimiter)) { - if (endpoint) - endpoint->detach_inpipe (this); - term (); + zmq_msg_t msg; + bool ok = pipe->read (&msg); + zmq_assert (ok); + if (sink) + sink->delimited (this); + terminate (); return false; } @@ -74,17 +103,20 @@ bool zmq::reader_t::check_read () bool zmq::reader_t::read (zmq_msg_t *msg_) { + if (!active) + return false; + if (!pipe->read (msg_)) { - endpoint->kill (this); + active = false; return false; } // If delimiter was read, start termination process of the pipe. unsigned char *offset = 0; if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { - if (endpoint) - endpoint->detach_inpipe (this); - term (); + if (sink) + sink->delimited (this); + terminate (); return false; } @@ -92,87 +124,104 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) - send_reader_info (peer, msgs_read); + send_activate_writer (writer, msgs_read); return true; } -void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) +void zmq::reader_t::terminate () { - endpoint = endpoint_; -} + // If termination was already started by the peer, do nothing. + if (terminating) + return; -void zmq::reader_t::term () -{ - endpoint = NULL; - send_pipe_term (peer); + active = false; + terminating = true; + send_pipe_term (writer); } -void zmq::reader_t::process_revive () +void zmq::reader_t::process_activate_reader () { - // Beacuse of command throttling mechanism, incoming termination request - // may not have been processed before subsequent send. - // In that case endpoint is NULL. - if (endpoint) - endpoint->revive (this); + // Forward the event to the sink (either socket or session). + active = true; + sink->activated (this); } void zmq::reader_t::process_pipe_term_ack () { - peer = NULL; - delete pipe; + // At this point writer may already be deallocated. + // For safety's sake drop the reference to it. + writer = NULL; + + // Notify owner about the termination. + zmq_assert (sink); + sink->terminated (this); + + // Deallocate resources. + delete this; } -zmq::writer_t::writer_t (object_t *parent_, +zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, uint64_t hwm_, int64_t swap_size_) : object_t (parent_), - pipe (NULL), - peer (NULL), + active (true), + pipe (pipe_), + reader (reader_), hwm (hwm_), msgs_read (0), msgs_written (0), - msg_store (NULL), - extra_msg_flag (false), - stalled (false), - pending_close (false), - endpoint (NULL) + swap (NULL), + sink (NULL), + swapping (false), + pending_delimiter (false), + terminating (false) { + // Inform reader about the writer. + reader->set_writer (this); + + // Open the swap file, if required. if (swap_size_ > 0) { - msg_store = new (std::nothrow) msg_store_t (swap_size_); - if (msg_store != NULL) { - if (msg_store->init () < 0) { - delete msg_store; - msg_store = NULL; - } - } + swap = new (std::nothrow) swap_t (swap_size_); + zmq_assert (swap); + int rc = swap->init (); + zmq_assert (rc == 0); } } -void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) -{ - endpoint = endpoint_; -} - zmq::writer_t::~writer_t () { - if (extra_msg_flag) - zmq_msg_close (&extra_msg); - - delete msg_store; + if (swap) + delete swap; } -void zmq::writer_t::set_pipe (pipe_t *pipe_) +void zmq::writer_t::set_event_sink (i_writer_events *sink_) { - zmq_assert (!pipe); - pipe = pipe_; - peer = &pipe->reader; + zmq_assert (!sink); + sink = sink_; } bool zmq::writer_t::check_write () { - if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) { - stalled = true; + // We've already checked and there's no space free for the new message. + // There's no point in checking once again. + if (unlikely (!active)) return false; + + if (unlikely (swapping)) { + if (unlikely (swap->full ())) { + active = false; + return false; + } + } + else { + if (unlikely (pipe_full ())) { + if (swap) + swapping = true; + else { + active = false; + return false; + } + } } return true; @@ -180,73 +229,67 @@ bool zmq::writer_t::check_write () bool zmq::writer_t::write (zmq_msg_t *msg_) { - if (!check_write ()) + if (unlikely (!check_write ())) return false; - if (pipe_full ()) { - if (msg_store->store (msg_)) { - if (!(msg_->flags & ZMQ_MSG_MORE)) - msg_store->commit (); - } else { - extra_msg = *msg_; - extra_msg_flag = true; - } - } - else { - pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); + if (unlikely (swapping)) { + bool stored = swap->store (msg_); + zmq_assert (stored); if (!(msg_->flags & ZMQ_MSG_MORE)) - msgs_written++; + swap->commit (); + return true; } + pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); + if (!(msg_->flags & ZMQ_MSG_MORE)) + msgs_written++; + return true; } void zmq::writer_t::rollback () { - if (extra_msg_flag && extra_msg.flags & ZMQ_MSG_MORE) { - zmq_msg_close (&extra_msg); - extra_msg_flag = false; + // Remove incomplete message from the swap. + if (unlikely (swapping)) { + swap->rollback (); + return; } - if (msg_store != NULL) - msg_store->rollback (); - + // Remove incomplete message from the pipe. zmq_msg_t msg; - // Remove all incomplete messages from the pipe. while (pipe->unwrite (&msg)) { zmq_assert (msg.flags & ZMQ_MSG_MORE); zmq_msg_close (&msg); } - - if (stalled && endpoint != NULL && check_write ()) { - stalled = false; - endpoint->revive (this); - } } void zmq::writer_t::flush () { - if (!pipe->flush ()) - send_revive (peer); + // In the swapping mode, flushing is automatically handled by swap object. + if (!swapping && !pipe->flush ()) + send_activate_reader (reader); } -void zmq::writer_t::term () +void zmq::writer_t::terminate () { - endpoint = NULL; + // Prevent double termination. + if (terminating) + return; + + // Mark the pipe as not available for writing. + active = false; // Rollback any unfinished messages. rollback (); - if (msg_store == NULL || (msg_store->empty () && !extra_msg_flag)) - write_delimiter (); - else - pending_close = true; -} + if (swapping) { + pending_delimiter = true; + return; + } -void zmq::writer_t::write_delimiter () -{ - // Push delimiter into the pipe. - // Trick the compiler to belive that the tag is a valid pointer. + // Push delimiter into the pipe. Trick the compiler to belive that + // the tag is a valid pointer. Note that watermarks are not checked + // thus the delimiter can be written even though the pipe is full. zmq_msg_t msg; const unsigned char *offset = 0; msg.content = (void*) (offset + ZMQ_DELIMITER); @@ -255,109 +298,110 @@ void zmq::writer_t::write_delimiter () flush (); } -void zmq::writer_t::process_reader_info (uint64_t msgs_read_) +void zmq::writer_t::process_activate_writer (uint64_t msgs_read_) { - zmq_msg_t msg; - + // Store the reader's message sequence number. msgs_read = msgs_read_; - if (msg_store) { - // Move messages from backing store into pipe. - while (!pipe_full () && !msg_store->empty ()) { - msg_store->fetch(&msg); - // Write message into the pipe. + // If we are in the swapping mode, we have some messages in the swap. + // Given that pipe is now ready for writing we can move part of the + // swap into the pipe. + if (swapping) { + zmq_msg_t msg; + while (!pipe_full () && !swap->empty ()) { + swap->fetch(&msg); pipe->write (msg, msg.flags & ZMQ_MSG_MORE); if (!(msg.flags & ZMQ_MSG_MORE)) msgs_written++; } - - if (extra_msg_flag) { - if (!pipe_full ()) { - pipe->write (extra_msg, extra_msg.flags & ZMQ_MSG_MORE); - if (!(extra_msg.flags & ZMQ_MSG_MORE)) - msgs_written++; - extra_msg_flag = false; - } - else if (msg_store->store (&extra_msg)) { - if (!(extra_msg.flags & ZMQ_MSG_MORE)) - msg_store->commit (); - extra_msg_flag = false; + if (!pipe->flush ()) + send_activate_reader (reader); + + // There are no more messages in the swap. We can switch into + // standard in-memory mode. + if (swap->empty ()) { + swapping = false; + + // Push delimiter into the pipe. Trick the compiler to belive that + // the tag is a valid pointer. Note that watermarks are not checked + // thus the delimiter can be written even though the pipe is full. + if (pending_delimiter) { + zmq_msg_t msg; + const unsigned char *offset = 0; + msg.content = (void*) (offset + ZMQ_DELIMITER); + msg.flags = 0; + pipe->write (msg, false); + flush (); + return; } } - - if (pending_close && msg_store->empty () && !extra_msg_flag) { - write_delimiter (); - pending_close = false; - } - - flush (); } - if (stalled && endpoint != NULL) { - stalled = false; - endpoint->revive (this); + // If the writer was non-active before, let's make it active + // (available for writing messages to). + if (!active) { + active = true; + zmq_assert (sink); + sink->activated (this); } } void zmq::writer_t::process_pipe_term () { - if (endpoint) - endpoint->detach_outpipe (this); + send_pipe_term_ack (reader); - reader_t *p = peer; - peer = NULL; - send_pipe_term_ack (p); -} + // The above command allows reader to deallocate itself and the pipe. + // For safety's sake we'll drop the pointers here. + reader = NULL; + pipe = NULL; -bool zmq::writer_t::pipe_full () -{ - return hwm > 0 && msgs_written - msgs_read == hwm; -} + // Notify owner about the termination. + zmq_assert (sink); + sink->terminated (this); -zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, int64_t swap_size_) : - reader (reader_parent_, compute_lwm (hwm_)), - writer (writer_parent_, hwm_, swap_size_) -{ - reader.set_pipe (this); - writer.set_pipe (this); + // Deallocate the resources. + delete this; } -zmq::pipe_t::~pipe_t () +bool zmq::writer_t::pipe_full () { - // Deallocate all the unread messages in the pipe. We have to do it by - // hand because zmq_msg_t is a POD, not a class, so there's no associated - // destructor. - zmq_msg_t msg; - while (read (&msg)) - zmq_msg_close (&msg); + return hwm > 0 && msgs_written - msgs_read == hwm; } -uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_) +void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, + uint64_t hwm_, int64_t swap_size_, reader_t **reader_, writer_t **writer_) { - // Following point should be taken into consideration when computing - // low watermark: - // - // 1. LWM has to be less than HWM. - // 2. LWM cannot be set to very low value (such as zero) as after filling - // the queue it would start to refill only after all the messages are - // read from it and thus unnecessarily hold the progress back. - // 3. LWM cannot be set to very high value (such as HWM-1) as it would - // result in lock-step filling of the queue - if a single message is read - // from a full queue, writer thread is resumed to write exactly one - // message to the queue and go back to sleep immediately. This would - // result in low performance. - // - // Given the 3. it would be good to keep HWM and LWM as far apart as - // possible to reduce the thread switching overhead to almost zero, - // say HWM-LWM should be 500 (max_wm_delta). - // - // That done, we still we have to account for the cases where HWM<500 thus - // driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases. - - if (hwm_ > max_wm_delta * 2) - return hwm_ - max_wm_delta; - else - return (hwm_ + 1) / 2; + // First compute the low water mark. Following point should be taken + // into consideration: + // + // 1. LWM has to be less than HWM. + // 2. LWM cannot be set to very low value (such as zero) as after filling + // the queue it would start to refill only after all the messages are + // read from it and thus unnecessarily hold the progress back. + // 3. LWM cannot be set to very high value (such as HWM-1) as it would + // result in lock-step filling of the queue - if a single message is + // read from a full queue, writer thread is resumed to write exactly one + // message to the queue and go back to sleep immediately. This would + // result in low performance. + // + // Given the 3. it would be good to keep HWM and LWM as far apart as + // possible to reduce the thread switching overhead to almost zero, + // say HWM-LWM should be max_wm_delta. + // + // That done, we still we have to account for the cases where + // HWM < max_wm_delta thus driving LWM to negative numbers. + // Let's make LWM 1/2 of HWM in such cases. + uint64_t lwm = (hwm_ > max_wm_delta * 2) ? + hwm_ - max_wm_delta : (hwm_ + 1) / 2; + + // Create all three objects pipe consists of: the pipe per se, reader and + // writer. The pipe will be handled by reader and writer, its never passed + // to the user. Reader and writer are returned to the user. + pipe_t *pipe = new (std::nothrow) pipe_t (); + zmq_assert (pipe); + *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm); + zmq_assert (*reader_); + *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_, + hwm_, swap_size_); + zmq_assert (*writer_); } - |