From e0246e32d79d71f8e73207b43aed8b23648e4fc7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 21 Apr 2011 22:27:48 +0200 Subject: Message-related functionality factored out into msg_t class. This patch addresses serveral issues: 1. It gathers message related functionality scattered over whole codebase into a single class. 2. It makes zmq_msg_t an opaque datatype. Internals of the class don't pollute zmq.h header file. 3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33% of memory in scenarios with large amount of small messages. Signed-off-by: Martin Sustrik --- src/dist.cpp | 66 ++++++++++++++++-------------------------------------------- 1 file changed, 17 insertions(+), 49 deletions(-) (limited to 'src/dist.cpp') diff --git a/src/dist.cpp b/src/dist.cpp index 9d50368..093da79 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -18,8 +18,6 @@ along with this program. If not, see . */ -#include "../include/zmq.h" - #include "dist.hpp" #include "pipe.hpp" #include "err.hpp" @@ -89,10 +87,10 @@ void zmq::dist_t::activated (writer_t *pipe_) active++; } -int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) +int zmq::dist_t::send (msg_t *msg_, int flags_) { // Is this end of a multipart message? - bool msg_more = msg_->flags & ZMQ_MSG_MORE; + bool msg_more = msg_->flags () & msg_t::more; // Push the message to active pipes. distribute (msg_, flags_); @@ -106,63 +104,33 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) return 0; } -void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_) +void zmq::dist_t::distribute (msg_t *msg_, int flags_) { // If there are no active pipes available, simply drop the message. if (active == 0) { - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return; - } - - msg_content_t *content = (msg_content_t*) msg_->content; - - // For VSMs the copying is straighforward. - if (content == (msg_content_t*) ZMQ_VSM) { - for (pipes_t::size_type i = 0; i < active;) - if (write (pipes [i], msg_)) - i++; - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return; - } - - // Optimisation for the case when there's only a single pipe - // to send the message to - no refcount adjustment i.e. no atomic - // operations are needed. - if (active == 1) { - if (!write (pipes [0], msg_)) { - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - } - int rc = zmq_msg_init (msg_); + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); zmq_assert (rc == 0); return; } - // There are at least 2 destinations for the message. That means we have - // to deal with reference counting. First add N-1 references to - // the content (we are holding one reference anyway, that's why -1). - if (msg_->flags & ZMQ_MSG_SHARED) - content->refcnt.add (active - 1); - else { - content->refcnt.set (active); - msg_->flags |= ZMQ_MSG_SHARED; - } + // Add active-1 references to the message. We already hold one reference, + // that's why -1. + msg_->add_refs (active - 1); - // Push the message to all destinations. + // Push copy of the message to each active pipe. for (pipes_t::size_type i = 0; i < active;) { if (!write (pipes [i], msg_)) - content->refcnt.sub (1); + msg_->rm_refs (1); else i++; } - // Detach the original message from the data buffer. - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); + // Detach the original message from the data buffer. Note that we don't + // close the message. That's because we've already used all the references. + int rc = msg_->init (); + errno_assert (rc == 0); } bool zmq::dist_t::has_out () @@ -170,14 +138,14 @@ bool zmq::dist_t::has_out () return true; } -bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) +bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { active--; pipes.swap (pipes.index (pipe_), active); return false; } - if (!(msg_->flags & ZMQ_MSG_MORE)) + if (!(msg_->flags () & msg_t::more)) pipe_->flush (); return true; } -- cgit v1.2.3