From e0246e32d79d71f8e73207b43aed8b23648e4fc7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 21 Apr 2011 22:27:48 +0200 Subject: Message-related functionality factored out into msg_t class. This patch addresses serveral issues: 1. It gathers message related functionality scattered over whole codebase into a single class. 2. It makes zmq_msg_t an opaque datatype. Internals of the class don't pollute zmq.h header file. 3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33% of memory in scenarios with large amount of small messages. Signed-off-by: Martin Sustrik --- src/pipe.cpp | 49 ++++++++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 27 deletions(-) (limited to 'src/pipe.cpp') diff --git a/src/pipe.cpp b/src/pipe.cpp index 2af2dc2..36dc808 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -20,8 +20,6 @@ #include -#include "../include/zmq.h" - #include "pipe.hpp" #include "likely.hpp" @@ -53,11 +51,12 @@ zmq::reader_t::~reader_t () zmq_assert (pipe); // First delete all the unread messages in the pipe. We have to do it by - // hand because zmq_msg_t is a POD, not a class, so there's no associated - // destructor. - zmq_msg_t msg; - while (pipe->read (&msg)) - zmq_msg_close (&msg); + // hand because msg_t doesn't have automatic destructor. + msg_t msg; + while (pipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } delete pipe; } @@ -68,11 +67,9 @@ void zmq::reader_t::set_event_sink (i_reader_events *sink_) sink = sink_; } -bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) +bool zmq::reader_t::is_delimiter (msg_t &msg_) { - unsigned char *offset = 0; - - return msg_.content == (void*) (offset + ZMQ_DELIMITER); + return msg_.is_delimiter (); } bool zmq::reader_t::check_read () @@ -89,7 +86,7 @@ bool zmq::reader_t::check_read () // If the next item in the pipe is message delimiter, // initiate its termination. if (pipe->probe (is_delimiter)) { - zmq_msg_t msg; + msg_t msg; bool ok = pipe->read (&msg); zmq_assert (ok); if (sink) @@ -101,7 +98,7 @@ bool zmq::reader_t::check_read () return true; } -bool zmq::reader_t::read (zmq_msg_t *msg_) +bool zmq::reader_t::read (msg_t *msg_) { if (!active) return false; @@ -112,15 +109,14 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) } // If delimiter was read, start termination process of the pipe. - unsigned char *offset = 0; - if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { + if (msg_->is_delimiter ()) { if (sink) sink->delimited (this); terminate (); return false; } - if (!(msg_->flags & ZMQ_MSG_MORE)) + if (!(msg_->flags () & msg_t::more)) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) @@ -187,7 +183,7 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_) sink = sink_; } -bool zmq::writer_t::check_write (zmq_msg_t *msg_) +bool zmq::writer_t::check_write (msg_t *msg_) { // We've already checked and there's no space free for the new message. // There's no point in checking once again. @@ -202,13 +198,13 @@ bool zmq::writer_t::check_write (zmq_msg_t *msg_) return true; } -bool zmq::writer_t::write (zmq_msg_t *msg_) +bool zmq::writer_t::write (msg_t *msg_) { if (unlikely (!check_write (msg_))) return false; - pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); - if (!(msg_->flags & ZMQ_MSG_MORE)) + pipe->write (*msg_, msg_->flags () & msg_t::more); + if (!(msg_->flags () & msg_t::more)) msgs_written++; return true; @@ -217,10 +213,11 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) void zmq::writer_t::rollback () { // Remove incomplete message from the pipe. - zmq_msg_t msg; + msg_t msg; while (pipe->unwrite (&msg)) { - zmq_assert (msg.flags & ZMQ_MSG_MORE); - zmq_msg_close (&msg); + zmq_assert (msg.flags () & msg_t::more); + int rc = msg.close (); + errno_assert (rc == 0); } } @@ -246,10 +243,8 @@ void zmq::writer_t::terminate () // Push delimiter into the pipe. Trick the compiler to belive that // the tag is a valid pointer. Note that watermarks are not checked // thus the delimiter can be written even though the pipe is full. - zmq_msg_t msg; - const unsigned char *offset = 0; - msg.content = (void*) (offset + ZMQ_DELIMITER); - msg.flags = 0; + msg_t msg; + msg.init_delimiter (); pipe->write (msg, false); flush (); } -- cgit v1.2.3