diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-04-21 22:27:48 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-04-21 22:27:48 +0200 |
commit | e0246e32d79d71f8e73207b43aed8b23648e4fc7 (patch) | |
tree | 9952ee6fd39f4e27bbe932f6b6f30f0073009369 /src/pipe.cpp | |
parent | 581697695aac72894f2d3fefac904b9d50b3ba67 (diff) |
Message-related functionality factored out into msg_t class.
This patch addresses serveral issues:
1. It gathers message related functionality scattered over whole
codebase into a single class.
2. It makes zmq_msg_t an opaque datatype. Internals of the class
don't pollute zmq.h header file.
3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33%
of memory in scenarios with large amount of small messages.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r-- | src/pipe.cpp | 49 |
1 files changed, 22 insertions, 27 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 2af2dc2..36dc808 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -20,8 +20,6 @@ #include <new> -#include "../include/zmq.h" - #include "pipe.hpp" #include "likely.hpp" @@ -53,11 +51,12 @@ zmq::reader_t::~reader_t () zmq_assert (pipe); // First delete all the unread messages in the pipe. We have to do it by - // hand because zmq_msg_t is a POD, not a class, so there's no associated - // destructor. - zmq_msg_t msg; - while (pipe->read (&msg)) - zmq_msg_close (&msg); + // hand because msg_t doesn't have automatic destructor. + msg_t msg; + while (pipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } delete pipe; } @@ -68,11 +67,9 @@ void zmq::reader_t::set_event_sink (i_reader_events *sink_) sink = sink_; } -bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) +bool zmq::reader_t::is_delimiter (msg_t &msg_) { - unsigned char *offset = 0; - - return msg_.content == (void*) (offset + ZMQ_DELIMITER); + return msg_.is_delimiter (); } bool zmq::reader_t::check_read () @@ -89,7 +86,7 @@ bool zmq::reader_t::check_read () // If the next item in the pipe is message delimiter, // initiate its termination. if (pipe->probe (is_delimiter)) { - zmq_msg_t msg; + msg_t msg; bool ok = pipe->read (&msg); zmq_assert (ok); if (sink) @@ -101,7 +98,7 @@ bool zmq::reader_t::check_read () return true; } -bool zmq::reader_t::read (zmq_msg_t *msg_) +bool zmq::reader_t::read (msg_t *msg_) { if (!active) return false; @@ -112,15 +109,14 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) } // If delimiter was read, start termination process of the pipe. - unsigned char *offset = 0; - if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { + if (msg_->is_delimiter ()) { if (sink) sink->delimited (this); terminate (); return false; } - if (!(msg_->flags & ZMQ_MSG_MORE)) + if (!(msg_->flags () & msg_t::more)) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) @@ -187,7 +183,7 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_) sink = sink_; } -bool zmq::writer_t::check_write (zmq_msg_t *msg_) +bool zmq::writer_t::check_write (msg_t *msg_) { // We've already checked and there's no space free for the new message. // There's no point in checking once again. @@ -202,13 +198,13 @@ bool zmq::writer_t::check_write (zmq_msg_t *msg_) return true; } -bool zmq::writer_t::write (zmq_msg_t *msg_) +bool zmq::writer_t::write (msg_t *msg_) { if (unlikely (!check_write (msg_))) return false; - pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); - if (!(msg_->flags & ZMQ_MSG_MORE)) + pipe->write (*msg_, msg_->flags () & msg_t::more); + if (!(msg_->flags () & msg_t::more)) msgs_written++; return true; @@ -217,10 +213,11 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) void zmq::writer_t::rollback () { // Remove incomplete message from the pipe. - zmq_msg_t msg; + msg_t msg; while (pipe->unwrite (&msg)) { - zmq_assert (msg.flags & ZMQ_MSG_MORE); - zmq_msg_close (&msg); + zmq_assert (msg.flags () & msg_t::more); + int rc = msg.close (); + errno_assert (rc == 0); } } @@ -246,10 +243,8 @@ void zmq::writer_t::terminate () // Push delimiter into the pipe. Trick the compiler to belive that // the tag is a valid pointer. Note that watermarks are not checked // thus the delimiter can be written even though the pipe is full. - zmq_msg_t msg; - const unsigned char *offset = 0; - msg.content = (void*) (offset + ZMQ_DELIMITER); - msg.flags = 0; + msg_t msg; + msg.init_delimiter (); pipe->write (msg, false); flush (); } |