diff options
-rw-r--r-- | src/pipe.cpp | 194 | ||||
-rw-r--r-- | src/pipe.hpp | 39 |
2 files changed, 120 insertions, 113 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 7fa7133..de20392 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -27,6 +27,7 @@ zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, uint64_t lwm_) : object_t (parent_), + active (true), pipe (pipe_), writer (NULL), lwm (lwm_), @@ -76,12 +77,14 @@ bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) bool zmq::reader_t::check_read () { - if (unlikely (terminating)) + if (!active) return false; // Check if there's an item in the pipe. - if (!pipe->check_read ()) + if (!pipe->check_read ()) { + active = false; return false; + } // If the next item in the pipe is message delimiter, // initiate its termination. @@ -95,11 +98,13 @@ bool zmq::reader_t::check_read () bool zmq::reader_t::read (zmq_msg_t *msg_) { - if (unlikely (terminating)) + if (!active) return false; - if (!pipe->read (msg_)) + if (!pipe->read (msg_)) { + active = false; return false; + } // If delimiter was read, start termination process of the pipe. unsigned char *offset = 0; @@ -123,6 +128,7 @@ void zmq::reader_t::terminate () if (terminating) return; + active = false; terminating = true; send_pipe_term (writer); } @@ -130,6 +136,7 @@ void zmq::reader_t::terminate () void zmq::reader_t::process_activate_reader () { // Forward the event to the sink (either socket or session). + active = true; sink->activated (this); } @@ -150,38 +157,34 @@ void zmq::reader_t::process_pipe_term_ack () zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, uint64_t hwm_, int64_t swap_size_) : object_t (parent_), + active (true), pipe (pipe_), reader (reader_), hwm (hwm_), msgs_read (0), msgs_written (0), - msg_store (NULL), - extra_msg_flag (false), - stalled (false), + swap (NULL), sink (NULL), - terminating (false), - pending_close (false) + swapping (false), + pending_delimiter (false), + terminating (false) { // Inform reader about the writer. reader->set_writer (this); + // Open the swap file, if required. if (swap_size_ > 0) { - msg_store = new (std::nothrow) msg_store_t (swap_size_); - if (msg_store != NULL) { - if (msg_store->init () < 0) { - delete msg_store; - msg_store = NULL; - } - } + swap = new (std::nothrow) msg_store_t (swap_size_); + zmq_assert (swap); + int rc = swap->init (); + zmq_assert (rc == 0); } } zmq::writer_t::~writer_t () { - if (extra_msg_flag) - zmq_msg_close (&extra_msg); - - delete msg_store; + if (swap) + delete swap; } void zmq::writer_t::set_event_sink (i_writer_events *sink_) @@ -192,13 +195,26 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_) bool zmq::writer_t::check_write () { - if (terminating) - return false; - - if (pipe_full () && (msg_store == NULL || msg_store->full () || - extra_msg_flag)) { - stalled = true; + // We've already checked and there's no space free for the new message. + // There's no point in checking once again. + if (unlikely (!active)) return false; + + if (unlikely (swapping)) { + if (unlikely (swap->full ())) { + active = false; + return false; + } + } + else { + if (unlikely (pipe_full ())) { + if (swap) + swapping = true; + else { + active = false; + return false; + } + } } return true; @@ -206,58 +222,44 @@ bool zmq::writer_t::check_write () bool zmq::writer_t::write (zmq_msg_t *msg_) { - if (terminating) - return false; - - if (!check_write ()) + if (unlikely (!check_write ())) return false; - if (pipe_full ()) { - if (msg_store->store (msg_)) { - if (!(msg_->flags & ZMQ_MSG_MORE)) - msg_store->commit (); - } else { - extra_msg = *msg_; - extra_msg_flag = true; - } - } - else { - pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); + if (unlikely (swapping)) { + bool stored = swap->store (msg_); + zmq_assert (stored); if (!(msg_->flags & ZMQ_MSG_MORE)) - msgs_written++; + swap->commit (); + return true; } + pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); + if (!(msg_->flags & ZMQ_MSG_MORE)) + msgs_written++; + return true; } void zmq::writer_t::rollback () { - if (extra_msg_flag && extra_msg.flags & ZMQ_MSG_MORE) { - zmq_msg_close (&extra_msg); - extra_msg_flag = false; + // Remove incomplete message from the swap. + if (unlikely (swapping)) { + swap->rollback (); + return; } - if (msg_store != NULL) - msg_store->rollback (); - + // Remove incomplete message from the pipe. zmq_msg_t msg; - // Remove all incomplete messages from the pipe. while (pipe->unwrite (&msg)) { zmq_assert (msg.flags & ZMQ_MSG_MORE); zmq_msg_close (&msg); - msgs_written--; - } - - if (stalled && check_write ()) { - stalled = false; - zmq_assert (sink); - sink->activated (this); } } void zmq::writer_t::flush () { - if (!pipe->flush ()) + // In the swapping mode, flushing is automatically handled by swap object. + if (!swapping && !pipe->flush ()) send_activate_reader (reader); } @@ -267,19 +269,20 @@ void zmq::writer_t::terminate () if (terminating) return; - if (msg_store == NULL || (msg_store->empty () && !extra_msg_flag)) - write_delimiter (); - else - pending_close = true; -} + // Mark the pipe as not available for writing. + active = false; -void zmq::writer_t::write_delimiter () -{ // Rollback any unfinished messages. rollback (); - // Push delimiter into the pipe. - // Trick the compiler to belive that the tag is a valid pointer. + if (swapping) { + pending_delimiter = true; + return; + } + + // Push delimiter into the pipe. Trick the compiler to belive that + // the tag is a valid pointer. Note that watermarks are not checked + // thus the delimiter can be written even though the pipe is full. zmq_msg_t msg; const unsigned char *offset = 0; msg.content = (void*) (offset + ZMQ_DELIMITER); @@ -290,44 +293,47 @@ void zmq::writer_t::write_delimiter () void zmq::writer_t::process_activate_writer (uint64_t msgs_read_) { - zmq_msg_t msg; - + // Store the reader's message sequence number. msgs_read = msgs_read_; - if (msg_store) { - // Move messages from backing store into pipe. - while (!pipe_full () && !msg_store->empty ()) { - msg_store->fetch(&msg); - // Write message into the pipe. + // If we are in the swapping mode, we have some messages in the swap. + // Given that pipe is now ready for writing we can move part of the + // swap into the pipe. + if (swapping) { + zmq_msg_t msg; + while (!pipe_full () && !swap->empty ()) { + swap->fetch(&msg); pipe->write (msg, msg.flags & ZMQ_MSG_MORE); if (!(msg.flags & ZMQ_MSG_MORE)) msgs_written++; } + if (!pipe->flush ()) + send_activate_reader (reader); + } - if (extra_msg_flag) { - if (!pipe_full ()) { - pipe->write (extra_msg, extra_msg.flags & ZMQ_MSG_MORE); - if (!(extra_msg.flags & ZMQ_MSG_MORE)) - msgs_written++; - extra_msg_flag = false; - } - else if (msg_store->store (&extra_msg)) { - if (!(extra_msg.flags & ZMQ_MSG_MORE)) - msg_store->commit (); - extra_msg_flag = false; - } - } - - if (pending_close && msg_store->empty () && !extra_msg_flag) { - write_delimiter (); - pending_close = false; + // There are no more messages in the swap. We can switch into + // standard in-memory mode. + if (swap->empty ()) { + swapping = false; + + // Push delimiter into the pipe. Trick the compiler to belive that + // the tag is a valid pointer. Note that watermarks are not checked + // thus the delimiter can be written even though the pipe is full. + if (pending_delimiter) { + zmq_msg_t msg; + const unsigned char *offset = 0; + msg.content = (void*) (offset + ZMQ_DELIMITER); + msg.flags = 0; + pipe->write (msg, false); + flush (); + return; } - - flush (); } - if (stalled) { - stalled = false; + // If the writer was non-active before, let's make it active + // (available for writing messages to). + if (!active) { + active = true; zmq_assert (sink); sink->activated (this); } diff --git a/src/pipe.hpp b/src/pipe.hpp index dcdd927..dee37a5 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -87,6 +87,9 @@ namespace zmq // Returns true if the message is delimiter; false otherwise. static bool is_delimiter (zmq_msg_t &msg_); + // True, if pipe can be read from. + bool active; + // The underlying pipe. pipe_t *pipe; @@ -127,8 +130,8 @@ namespace zmq void set_event_sink (i_writer_events *endpoint_); // Checks whether a message can be written to the pipe. - // If writing the message would cause high watermark to be - // exceeded, the function returns false. + // If writing the message would cause high watermark and (optionally) + // swap to be exceeded, the function returns false. bool check_write (); // Writes a message to the underlying pipe. Returns false if the @@ -150,17 +153,17 @@ namespace zmq uint64_t hwm_, int64_t swap_size_); ~writer_t (); - void process_activate_writer (uint64_t msgs_read_); - // Command handlers. + void process_activate_writer (uint64_t msgs_read_); void process_pipe_term (); - // Tests whether the pipe is already full. + // Tests whether underlying pipe is already full. The swap is not + // taken into account. bool pipe_full (); - // Write special message to the pipe so that the reader - // can find out we are finished. - void write_delimiter (); + // True, if this object can be written to. Undelying ypipe may be full + // but as long as there's swap space available, this flag is true. + bool active; // The underlying pipe. pipe_t *pipe; @@ -178,26 +181,24 @@ namespace zmq // Number of messages we have written so far. uint64_t msgs_written; - // Pointer to backing store. If NULL, messages are always + // Pointer to the message swap. If NULL, messages are always // kept in main memory. - msg_store_t *msg_store; - - bool extra_msg_flag; - - zmq_msg_t extra_msg; - - // True iff the last attempt to write a message has failed. - bool stalled; + msg_store_t *swap; // Sink for the events (either the socket or the session). i_writer_events *sink; + // If true, swap is active. New messages are to be written to the swap. + bool swapping; + + // If true, there's a delimiter to be written to the pipe after the + // swap is empied. + bool pending_delimiter; + // True is 'terminate' method was called of 'pipe_term' command // arrived from the reader. bool terminating; - bool pending_close; - writer_t (const writer_t&); void operator = (const writer_t&); }; |