/* Copyright (c) 2009-2012 250bpm s.r.o. Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of Crossroads I/O project. Crossroads I/O is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. Crossroads is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "msg.hpp" #include "../include/xs/xs.h" #include #include #include #include #include "stdint.hpp" #include "likely.hpp" #include "err.hpp" // Check whether the sizes of public representation of the message (xs_msg_t) // and private represenation of the message (xs::msg_t) match. typedef char xs_msg_size_check [2 * ((sizeof (xs::msg_t) == sizeof (xs_msg_t)) != 0) - 1]; bool xs::msg_t::check () { return u.base.type >= type_min && u.base.type <= type_max; } int xs::msg_t::init () { u.vsm.type = type_vsm; u.vsm.flags = 0; u.vsm.size = 0; return 0; } int xs::msg_t::init_size (size_t size_) { if (size_ <= max_vsm_size) { u.vsm.type = type_vsm; u.vsm.flags = 0; u.vsm.size = (unsigned char) size_; } else { u.lmsg.type = type_lmsg; u.lmsg.flags = 0; u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_); if (!u.lmsg.content) { errno = ENOMEM; return -1; } u.lmsg.content->data = u.lmsg.content + 1; u.lmsg.content->size = size_; u.lmsg.content->ffn = NULL; u.lmsg.content->hint = NULL; new (&u.lmsg.content->refcnt) xs::atomic_counter_t (); } return 0; } int xs::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_) { u.lmsg.type = type_lmsg; u.lmsg.flags = 0; u.lmsg.content = (content_t*) malloc (sizeof (content_t)); if (!u.lmsg.content) { errno = ENOMEM; return -1; } u.lmsg.content->data = data_; u.lmsg.content->size = size_; u.lmsg.content->ffn = ffn_; u.lmsg.content->hint = hint_; new (&u.lmsg.content->refcnt) xs::atomic_counter_t (); return 0; } int xs::msg_t::init_delimiter () { u.delimiter.type = type_delimiter; u.delimiter.flags = 0; return 0; } int xs::msg_t::close () { // Check the validity of the message. if (unlikely (!check ())) { errno = EFAULT; return -1; } if (u.base.type == type_lmsg) { // If the content is not shared, or if it is shared and the reference // count has dropped to zero, deallocate it. if (!(u.lmsg.flags & msg_t::shared) || !u.lmsg.content->refcnt.sub (1)) { // We used "placement new" operator to initialize the reference // counter so we call the destructor explicitly now. u.lmsg.content->refcnt.~atomic_counter_t (); if (u.lmsg.content->ffn) u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint); free (u.lmsg.content); } } // Make the message invalid. u.base.type = 0; return 0; } int xs::msg_t::move (msg_t &src_) { // Check the validity of the source. if (unlikely (!src_.check ())) { errno = EFAULT; return -1; } int rc = close (); if (unlikely (rc < 0)) return rc; *this = src_; rc = src_.init (); if (unlikely (rc < 0)) return rc; return 0; } int xs::msg_t::copy (msg_t &src_) { // Check the validity of the source. if (unlikely (!src_.check ())) { errno = EFAULT; return -1; } int rc = close (); if (unlikely (rc < 0)) return rc; if (src_.u.base.type == type_lmsg) { // One reference is added to shared messages. Non-shared messages // are turned into shared messages and reference count is set to 2. if (src_.u.lmsg.flags & msg_t::shared) src_.u.lmsg.content->refcnt.add (1); else { src_.u.lmsg.flags |= msg_t::shared; src_.u.lmsg.content->refcnt.set (2); } } *this = src_; return 0; } void *xs::msg_t::data () { // Check the validity of the message. xs_assert (check ()); switch (u.base.type) { case type_vsm: return u.vsm.data; case type_lmsg: return u.lmsg.content->data; default: xs_assert (false); return NULL; } } size_t xs::msg_t::size () { // Check the validity of the message. xs_assert (check ()); switch (u.base.type) { case type_vsm: return u.vsm.size; case type_lmsg: return u.lmsg.content->size; default: xs_assert (false); return 0; } } unsigned char xs::msg_t::flags () { return u.base.flags; } void xs::msg_t::set_flags (unsigned char flags_) { u.base.flags |= flags_; } void xs::msg_t::reset_flags (unsigned char flags_) { u.base.flags &= ~flags_; } bool xs::msg_t::is_delimiter () { return u.base.type == type_delimiter; } bool xs::msg_t::is_vsm () { return u.base.type == type_vsm; } void xs::msg_t::add_refs (int refs_) { xs_assert (refs_ >= 0); // No copies required. if (!refs_) return; // VSMs and delimiters can be copied straight away. The only message type // that needs special care are long messages. if (u.base.type == type_lmsg) { if (u.lmsg.flags & msg_t::shared) u.lmsg.content->refcnt.add (refs_); else { u.lmsg.content->refcnt.set (refs_ + 1); u.lmsg.flags |= msg_t::shared; } } } bool xs::msg_t::rm_refs (int refs_) { xs_assert (refs_ >= 0); // No copies required. if (!refs_) return true; // If there's only one reference close the message. if (u.base.type != type_lmsg || !(u.lmsg.flags & msg_t::shared)) { close (); return false; } // The only message type that needs special care are long messages. if (!u.lmsg.content->refcnt.sub (refs_)) { close (); return false; } return true; }