From 6191213a5737b774d83f341d4507b8baf702d381 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 12 Apr 2011 14:20:49 +0200 Subject: Code dealing with messages moved to msg.cpp Signed-off-by: Martin Sustrik --- src/Makefile.am | 3 +- src/dist.cpp | 2 +- src/msg.cpp | 159 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/msg.hpp | 52 +++++++++++++++++ src/msg_content.hpp | 52 ----------------- src/zmq.cpp | 131 ------------------------------------------- 6 files changed, 214 insertions(+), 185 deletions(-) create mode 100644 src/msg.cpp create mode 100644 src/msg.hpp delete mode 100644 src/msg_content.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 32b144e..de83d76 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -33,7 +33,7 @@ libzmq_la_SOURCES = \ lb.hpp \ likely.hpp \ mailbox.hpp \ - msg_content.hpp \ + msg.hpp \ mutex.hpp \ named_session.hpp \ object.hpp \ @@ -96,6 +96,7 @@ libzmq_la_SOURCES = \ kqueue.cpp \ lb.cpp \ mailbox.cpp \ + msg.cpp \ named_session.cpp \ object.cpp \ options.cpp \ diff --git a/src/dist.cpp b/src/dist.cpp index e447bc1..9d50368 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -24,7 +24,7 @@ #include "pipe.hpp" #include "err.hpp" #include "own.hpp" -#include "msg_content.hpp" +#include "msg.hpp" zmq::dist_t::dist_t (own_t *sink_) : active (0), diff --git a/src/msg.cpp b/src/msg.cpp new file mode 100644 index 0000000..e9d1da7 --- /dev/null +++ b/src/msg.cpp @@ -0,0 +1,159 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "msg.hpp" + +#include +#include +#include +#include + +#include "stdint.hpp" +#include "err.hpp" + +int zmq_msg_init (zmq_msg_t *msg_) +{ + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; + msg_->flags = 0; + msg_->vsm_size = 0; + return 0; +} + +int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) +{ + if (size_ <= ZMQ_MAX_VSM_SIZE) { + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; + msg_->flags = 0; + msg_->vsm_size = (uint8_t) size_; + } + else { + msg_->content = + (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_); + if (!msg_->content) { + errno = ENOMEM; + return -1; + } + msg_->flags = 0; + + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + content->data = (void*) (content + 1); + content->size = size_; + content->ffn = NULL; + content->hint = NULL; + new (&content->refcnt) zmq::atomic_counter_t (); + } + return 0; +} + +int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, + zmq_free_fn *ffn_, void *hint_) +{ + msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); + alloc_assert (msg_->content); + msg_->flags = 0; + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + content->data = data_; + content->size = size_; + content->ffn = ffn_; + content->hint = hint_; + new (&content->refcnt) zmq::atomic_counter_t (); + return 0; +} + +int zmq_msg_close (zmq_msg_t *msg_) +{ + // For VSMs and delimiters there are no resources to free. + if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && + msg_->content != (zmq::msg_content_t*) ZMQ_VSM) { + + // If the content is not shared, or if it is shared and the reference. + // count has dropped to zero, deallocate it. + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) { + + // We used "placement new" operator to initialize the reference. + // counter so we call its destructor now. + content->refcnt.~atomic_counter_t (); + + if (content->ffn) + content->ffn (content->data, content->hint); + free (content); + } + } + + // As a safety measure, let's make the deallocated message look like + // an empty message. + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; + msg_->flags = 0; + msg_->vsm_size = 0; + + return 0; +} + +int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) +{ + zmq_msg_close (dest_); + *dest_ = *src_; + zmq_msg_init (src_); + return 0; +} + +int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) +{ + zmq_msg_close (dest_); + + // VSMs and delimiters require no special handling. + if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && + src_->content != (zmq::msg_content_t*) ZMQ_VSM) { + + // One reference is added to shared messages. Non-shared messages + // are turned into shared messages and reference count is set to 2. + zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; + if (src_->flags & ZMQ_MSG_SHARED) + content->refcnt.add (1); + else { + src_->flags |= ZMQ_MSG_SHARED; + content->refcnt.set (2); + } + } + + *dest_ = *src_; + return 0; +} + +void *zmq_msg_data (zmq_msg_t *msg_) +{ + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) + return msg_->vsm_data; + if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) + return NULL; + + return ((zmq::msg_content_t*) msg_->content)->data; +} + +size_t zmq_msg_size (zmq_msg_t *msg_) +{ + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) + return msg_->vsm_size; + if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) + return 0; + + return ((zmq::msg_content_t*) msg_->content)->size; +} diff --git a/src/msg.hpp b/src/msg.hpp new file mode 100644 index 0000000..7e22098 --- /dev/null +++ b/src/msg.hpp @@ -0,0 +1,52 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_MSG_HPP_INCLUDE__ +#define __ZMQ_MSG_HPP_INCLUDE__ + +#include + +#include "../include/zmq.h" + +#include "atomic_counter.hpp" + +namespace zmq +{ + + // Shared message buffer. Message data are either allocated in one + // continuous block along with this structure - thus avoiding one + // malloc/free pair or they are stored in used-supplied memory. + // In the latter case, ffn member stores pointer to the function to be + // used to deallocate the data. If the buffer is actually shared (there + // are at least 2 references to it) refcount member contains number of + // references. + + struct msg_content_t + { + void *data; + size_t size; + zmq_free_fn *ffn; + void *hint; + zmq::atomic_counter_t refcnt; + }; + +} + +#endif diff --git a/src/msg_content.hpp b/src/msg_content.hpp deleted file mode 100644 index 7e22098..0000000 --- a/src/msg_content.hpp +++ /dev/null @@ -1,52 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_MSG_HPP_INCLUDE__ -#define __ZMQ_MSG_HPP_INCLUDE__ - -#include - -#include "../include/zmq.h" - -#include "atomic_counter.hpp" - -namespace zmq -{ - - // Shared message buffer. Message data are either allocated in one - // continuous block along with this structure - thus avoiding one - // malloc/free pair or they are stored in used-supplied memory. - // In the latter case, ffn member stores pointer to the function to be - // used to deallocate the data. If the buffer is actually shared (there - // are at least 2 references to it) refcount member contains number of - // references. - - struct msg_content_t - { - void *data; - size_t size; - zmq_free_fn *ffn; - void *hint; - zmq::atomic_counter_t refcnt; - }; - -} - -#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 9ea106d..eb8cc40 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -40,7 +40,6 @@ #include #include "socket_base.hpp" -#include "msg_content.hpp" #include "stdint.hpp" #include "config.hpp" #include "likely.hpp" @@ -70,136 +69,6 @@ const char *zmq_strerror (int errnum_) return zmq::errno_to_string (errnum_); } -int zmq_msg_init (zmq_msg_t *msg_) -{ - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = 0; - msg_->vsm_size = 0; - return 0; -} - -int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) -{ - if (size_ <= ZMQ_MAX_VSM_SIZE) { - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = 0; - msg_->vsm_size = (uint8_t) size_; - } - else { - msg_->content = - (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_); - if (!msg_->content) { - errno = ENOMEM; - return -1; - } - msg_->flags = 0; - - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - content->data = (void*) (content + 1); - content->size = size_; - content->ffn = NULL; - content->hint = NULL; - new (&content->refcnt) zmq::atomic_counter_t (); - } - return 0; -} - -int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, - zmq_free_fn *ffn_, void *hint_) -{ - msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); - alloc_assert (msg_->content); - msg_->flags = 0; - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - content->data = data_; - content->size = size_; - content->ffn = ffn_; - content->hint = hint_; - new (&content->refcnt) zmq::atomic_counter_t (); - return 0; -} - -int zmq_msg_close (zmq_msg_t *msg_) -{ - // For VSMs and delimiters there are no resources to free. - if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && - msg_->content != (zmq::msg_content_t*) ZMQ_VSM) { - - // If the content is not shared, or if it is shared and the reference. - // count has dropped to zero, deallocate it. - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) { - - // We used "placement new" operator to initialize the reference. - // counter so we call its destructor now. - content->refcnt.~atomic_counter_t (); - - if (content->ffn) - content->ffn (content->data, content->hint); - free (content); - } - } - - // As a safety measure, let's make the deallocated message look like - // an empty message. - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = 0; - msg_->vsm_size = 0; - - return 0; -} - -int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) -{ - zmq_msg_close (dest_); - *dest_ = *src_; - zmq_msg_init (src_); - return 0; -} - -int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) -{ - zmq_msg_close (dest_); - - // VSMs and delimiters require no special handling. - if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && - src_->content != (zmq::msg_content_t*) ZMQ_VSM) { - - // One reference is added to shared messages. Non-shared messages - // are turned into shared messages and reference count is set to 2. - zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; - if (src_->flags & ZMQ_MSG_SHARED) - content->refcnt.add (1); - else { - src_->flags |= ZMQ_MSG_SHARED; - content->refcnt.set (2); - } - } - - *dest_ = *src_; - return 0; -} - -void *zmq_msg_data (zmq_msg_t *msg_) -{ - if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) - return msg_->vsm_data; - if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) - return NULL; - - return ((zmq::msg_content_t*) msg_->content)->data; -} - -size_t zmq_msg_size (zmq_msg_t *msg_) -{ - if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) - return msg_->vsm_size; - if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) - return 0; - - return ((zmq::msg_content_t*) msg_->content)->size; -} - void *zmq_init (int io_threads_) { if (io_threads_ < 0) { -- cgit v1.2.3