/*
Copyright (c) 2009-2012 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of Crossroads I/O project.
Crossroads I/O is free software; you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
Crossroads is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
*/
#include "msg.hpp"
#include "../include/xs.h"
#include
#include
#include
#include
#include "stdint.hpp"
#include "likely.hpp"
#include "err.hpp"
// Check whether the sizes of public representation of the message (xs_msg_t)
// and private represenation of the message (xs::msg_t) match.
typedef char xs_msg_size_check
[2 * ((sizeof (xs::msg_t) == sizeof (xs_msg_t)) != 0) - 1];
bool xs::msg_t::check ()
{
return u.base.type >= type_min && u.base.type <= type_max;
}
int xs::msg_t::init ()
{
u.vsm.type = type_vsm;
u.vsm.flags = 0;
u.vsm.size = 0;
return 0;
}
int xs::msg_t::init_size (size_t size_)
{
if (size_ <= max_vsm_size) {
u.vsm.type = type_vsm;
u.vsm.flags = 0;
u.vsm.size = (unsigned char) size_;
}
else {
u.lmsg.type = type_lmsg;
u.lmsg.flags = 0;
u.lmsg.content =
(content_t*) malloc (sizeof (content_t) + size_);
if (!u.lmsg.content) {
errno = ENOMEM;
return -1;
}
u.lmsg.content->data = u.lmsg.content + 1;
u.lmsg.content->size = size_;
u.lmsg.content->ffn = NULL;
u.lmsg.content->hint = NULL;
new (&u.lmsg.content->refcnt) xs::atomic_counter_t ();
}
return 0;
}
int xs::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
void *hint_)
{
u.lmsg.type = type_lmsg;
u.lmsg.flags = 0;
u.lmsg.content = (content_t*) malloc (sizeof (content_t));
if (!u.lmsg.content) {
errno = ENOMEM;
return -1;
}
u.lmsg.content->data = data_;
u.lmsg.content->size = size_;
u.lmsg.content->ffn = ffn_;
u.lmsg.content->hint = hint_;
new (&u.lmsg.content->refcnt) xs::atomic_counter_t ();
return 0;
}
int xs::msg_t::init_delimiter ()
{
u.delimiter.type = type_delimiter;
u.delimiter.flags = 0;
return 0;
}
int xs::msg_t::close ()
{
// Check the validity of the message.
if (unlikely (!check ())) {
errno = EFAULT;
return -1;
}
if (u.base.type == type_lmsg) {
// If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it.
if (!(u.lmsg.flags & msg_t::shared) ||
!u.lmsg.content->refcnt.sub (1)) {
// We used "placement new" operator to initialize the reference
// counter so we call the destructor explicitly now.
u.lmsg.content->refcnt.~atomic_counter_t ();
if (u.lmsg.content->ffn)
u.lmsg.content->ffn (u.lmsg.content->data,
u.lmsg.content->hint);
free (u.lmsg.content);
}
}
// Make the message invalid.
u.base.type = 0;
return 0;
}
int xs::msg_t::move (msg_t &src_)
{
// Check the validity of the source.
if (unlikely (!src_.check ())) {
errno = EFAULT;
return -1;
}
int rc = close ();
if (unlikely (rc < 0))
return rc;
*this = src_;
rc = src_.init ();
if (unlikely (rc < 0))
return rc;
return 0;
}
int xs::msg_t::copy (msg_t &src_)
{
// Check the validity of the source.
if (unlikely (!src_.check ())) {
errno = EFAULT;
return -1;
}
int rc = close ();
if (unlikely (rc < 0))
return rc;
if (src_.u.base.type == type_lmsg) {
// One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2.
if (src_.u.lmsg.flags & msg_t::shared)
src_.u.lmsg.content->refcnt.add (1);
else {
src_.u.lmsg.flags |= msg_t::shared;
src_.u.lmsg.content->refcnt.set (2);
}
}
*this = src_;
return 0;
}
void *xs::msg_t::data ()
{
// Check the validity of the message.
xs_assert (check ());
switch (u.base.type) {
case type_vsm:
return u.vsm.data;
case type_lmsg:
return u.lmsg.content->data;
default:
xs_assert (false);
return NULL;
}
}
size_t xs::msg_t::size ()
{
// Check the validity of the message.
xs_assert (check ());
switch (u.base.type) {
case type_vsm:
return u.vsm.size;
case type_lmsg:
return u.lmsg.content->size;
default:
xs_assert (false);
return 0;
}
}
unsigned char xs::msg_t::flags ()
{
return u.base.flags;
}
void xs::msg_t::set_flags (unsigned char flags_)
{
u.base.flags |= flags_;
}
void xs::msg_t::reset_flags (unsigned char flags_)
{
u.base.flags &= ~flags_;
}
bool xs::msg_t::is_delimiter ()
{
return u.base.type == type_delimiter;
}
bool xs::msg_t::is_vsm ()
{
return u.base.type == type_vsm;
}
void xs::msg_t::add_refs (int refs_)
{
xs_assert (refs_ >= 0);
// No copies required.
if (!refs_)
return;
// VSMs and delimiters can be copied straight away. The only message type
// that needs special care are long messages.
if (u.base.type == type_lmsg) {
if (u.lmsg.flags & msg_t::shared)
u.lmsg.content->refcnt.add (refs_);
else {
u.lmsg.content->refcnt.set (refs_ + 1);
u.lmsg.flags |= msg_t::shared;
}
}
}
bool xs::msg_t::rm_refs (int refs_)
{
xs_assert (refs_ >= 0);
// No copies required.
if (!refs_)
return true;
// If there's only one reference close the message.
if (u.base.type != type_lmsg || !(u.lmsg.flags & msg_t::shared)) {
close ();
return false;
}
// The only message type that needs special care are long messages.
if (!u.lmsg.content->refcnt.sub (refs_)) {
close ();
return false;
}
return true;
}