From e0246e32d79d71f8e73207b43aed8b23648e4fc7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 21 Apr 2011 22:27:48 +0200 Subject: Message-related functionality factored out into msg_t class. This patch addresses serveral issues: 1. It gathers message related functionality scattered over whole codebase into a single class. 2. It makes zmq_msg_t an opaque datatype. Internals of the class don't pollute zmq.h header file. 3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33% of memory in scenarios with large amount of small messages. Signed-off-by: Martin Sustrik --- src/msg.cpp | 249 +++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 164 insertions(+), 85 deletions(-) (limited to 'src/msg.cpp') diff --git a/src/msg.cpp b/src/msg.cpp index e800bd6..bd6f066 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -29,155 +29,234 @@ #include "likely.hpp" #include "err.hpp" -int zmq_msg_init (zmq_msg_t *msg_) +bool zmq::msg_t::check () { - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; - msg_->vsm_size = 0; + return u.base.type >= type_min && u.base.type <= type_max; +} + +int zmq::msg_t::init () +{ + u.vsm.type = type_vsm; + u.vsm.flags = 0; + u.vsm.size = 0; return 0; } -int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) +int zmq::msg_t::init_size (size_t size_) { - if (size_ <= ZMQ_MAX_VSM_SIZE) { - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; - msg_->vsm_size = (uint8_t) size_; + if (size_ <= max_vsm_size) { + u.vsm.type = type_vsm; + u.vsm.flags = 0; + u.vsm.size = (unsigned char) size_; } else { - msg_->content = - (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_); - if (!msg_->content) { + u.lmsg.type = type_lmsg; + u.lmsg.flags = 0; + u.lmsg.content = + (content_t*) malloc (sizeof (content_t) + size_); + if (!u.lmsg.content) { errno = ENOMEM; return -1; } - msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; - - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - content->data = (void*) (content + 1); - content->size = size_; - content->ffn = NULL; - content->hint = NULL; - new (&content->refcnt) zmq::atomic_counter_t (); + + u.lmsg.content->data = u.lmsg.content + 1; + u.lmsg.content->size = size_; + u.lmsg.content->ffn = NULL; + u.lmsg.content->hint = NULL; + new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); } return 0; } -int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, - zmq_free_fn *ffn_, void *hint_) +int zmq::msg_t::init_data (void *data_, size_t size_, zmq_free_fn *ffn_, + void *hint_) { - msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); - alloc_assert (msg_->content); - msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - content->data = data_; - content->size = size_; - content->ffn = ffn_; - content->hint = hint_; - new (&content->refcnt) zmq::atomic_counter_t (); + u.lmsg.type = type_lmsg; + u.lmsg.flags = 0; + u.lmsg.content = (content_t*) malloc (sizeof (content_t)); + alloc_assert (u.lmsg.content); + + u.lmsg.content->data = data_; + u.lmsg.content->size = size_; + u.lmsg.content->ffn = ffn_; + u.lmsg.content->hint = hint_; + new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); return 0; + } -int zmq_msg_close (zmq_msg_t *msg_) +int zmq::msg_t::init_delimiter () { - // Check the validity tag. - if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) { + u.delimiter.type = type_delimiter; + u.delimiter.flags = 0; + return 0; +} + +int zmq::msg_t::close () +{ + // Check the validity of the message. + if (unlikely (!check ())) { errno = EFAULT; return -1; } - // For VSMs and delimiters there are no resources to free. - if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && - msg_->content != (zmq::msg_content_t*) ZMQ_VSM) { + if (u.base.type == type_lmsg) { - // If the content is not shared, or if it is shared and the reference. + // If the content is not shared, or if it is shared and the reference // count has dropped to zero, deallocate it. - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) { + if (!(u.lmsg.flags & msg_t::shared) || + !u.lmsg.content->refcnt.sub (1)) { - // We used "placement new" operator to initialize the reference. - // counter so we call its destructor now. - content->refcnt.~atomic_counter_t (); + // We used "placement new" operator to initialize the reference + // counter so we call the destructor explicitly now. + u.lmsg.content->refcnt.~atomic_counter_t (); - if (content->ffn) - content->ffn (content->data, content->hint); - free (content); + if (u.lmsg.content->ffn) + u.lmsg.content->ffn (u.lmsg.content->data, + u.lmsg.content->hint); + free (u.lmsg.content); } } - // Remove the validity tag from the message. - msg_->flags = 0; + // Make the message invalid. + u.base.type = 0; return 0; + } -int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) +int zmq::msg_t::move (msg_t &src_) { - // Check the validity tags. - if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff || - (src_->flags | ZMQ_MSG_MASK) != 0xff)) { + // Check the validity of the source. + if (unlikely (!src_.check ())) { errno = EFAULT; return -1; } - zmq_msg_close (dest_); - *dest_ = *src_; - zmq_msg_init (src_); + int rc = close (); + if (unlikely (rc < 0)) + return rc; + + *this = src_; + + rc = src_.init (); + if (unlikely (rc < 0)) + return rc; + return 0; } -int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) +int zmq::msg_t::copy (msg_t &src_) { - // Check the validity tags. - if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff || - (src_->flags | ZMQ_MSG_MASK) != 0xff)) { + // Check the validity of the source. + if (unlikely (!src_.check ())) { errno = EFAULT; return -1; } - zmq_msg_close (dest_); + int rc = close (); + if (unlikely (rc < 0)) + return rc; - // VSMs and delimiters require no special handling. - if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && - src_->content != (zmq::msg_content_t*) ZMQ_VSM) { + if (src_.u.base.type == type_lmsg) { // One reference is added to shared messages. Non-shared messages // are turned into shared messages and reference count is set to 2. - zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; - if (src_->flags & ZMQ_MSG_SHARED) - content->refcnt.add (1); + if (src_.u.lmsg.flags & msg_t::shared) + src_.u.lmsg.content->refcnt.add (1); else { - src_->flags |= ZMQ_MSG_SHARED; - content->refcnt.set (2); + src_.u.lmsg.flags |= msg_t::shared; + src_.u.lmsg.content->refcnt.set (2); } } - *dest_ = *src_; + *this = src_; + return 0; + +} + +void *zmq::msg_t::data () +{ + // Check the validity of the message. + zmq_assert (check ()); + + switch (u.base.type) { + case type_vsm: + return u.vsm.data; + case type_lmsg: + return u.lmsg.content->data; + default: + zmq_assert (false); + } } -void *zmq_msg_data (zmq_msg_t *msg_) +size_t zmq::msg_t::size () { - // Check the validity tag. - zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff); + // Check the validity of the message. + zmq_assert (check ()); - if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) - return msg_->vsm_data; - if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) - return NULL; + switch (u.base.type) { + case type_vsm: + return u.vsm.size; + case type_lmsg: + return u.lmsg.content->size; + default: + zmq_assert (false); + } +} - return ((zmq::msg_content_t*) msg_->content)->data; +unsigned char zmq::msg_t::flags () +{ + return u.base.flags; } -size_t zmq_msg_size (zmq_msg_t *msg_) +void zmq::msg_t::set_flags (unsigned char flags_) { - // Check the validity tag. - zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff); + u.base.flags |= flags_; +} - if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) - return msg_->vsm_size; - if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) - return 0; +void zmq::msg_t::reset_flags (unsigned char flags_) +{ + u.base.flags &= ~flags_; +} + +bool zmq::msg_t::is_delimiter () +{ + return u.base.type == type_delimiter; +} + +void zmq::msg_t::add_refs (int refs_) +{ + zmq_assert (refs_ >= 0); - return ((zmq::msg_content_t*) msg_->content)->size; + // No copies required. + if (!refs_) + return; + + // VSMs and delimiters can be copied straight away. The only message type + // that needs special care are long messages. + if (u.base.type == type_lmsg) { + if (u.lmsg.flags & msg_t::shared) + u.lmsg.content->refcnt.add (refs_); + else { + u.lmsg.content->refcnt.set (refs_ + 1); + u.lmsg.flags |= msg_t::shared; + } + } +} + +void zmq::msg_t::rm_refs (int refs_) +{ + zmq_assert (refs_ >= 0); + + // No copies required. + if (!refs_) + return; + + // The only message type that needs special care are long messages. + if (u.base.type == type_lmsg) { + zmq_assert (u.lmsg.flags & msg_t::shared); + u.lmsg.content->refcnt.sub (refs_); + } } -- cgit v1.2.3