diff options
-rw-r--r-- | src/Makefile.am | 3 | ||||
-rw-r--r-- | src/dist.cpp | 2 | ||||
-rw-r--r-- | src/msg.cpp | 159 | ||||
-rw-r--r-- | src/msg.hpp (renamed from src/msg_content.hpp) | 0 | ||||
-rw-r--r-- | src/zmq.cpp | 131 |
5 files changed, 162 insertions, 133 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 32b144e..de83d76 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -33,7 +33,7 @@ libzmq_la_SOURCES = \ lb.hpp \ likely.hpp \ mailbox.hpp \ - msg_content.hpp \ + msg.hpp \ mutex.hpp \ named_session.hpp \ object.hpp \ @@ -96,6 +96,7 @@ libzmq_la_SOURCES = \ kqueue.cpp \ lb.cpp \ mailbox.cpp \ + msg.cpp \ named_session.cpp \ object.cpp \ options.cpp \ diff --git a/src/dist.cpp b/src/dist.cpp index e447bc1..9d50368 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -24,7 +24,7 @@ #include "pipe.hpp" #include "err.hpp" #include "own.hpp" -#include "msg_content.hpp" +#include "msg.hpp" zmq::dist_t::dist_t (own_t *sink_) : active (0), diff --git a/src/msg.cpp b/src/msg.cpp new file mode 100644 index 0000000..e9d1da7 --- /dev/null +++ b/src/msg.cpp @@ -0,0 +1,159 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "msg.hpp" + +#include <string.h> +#include <errno.h> +#include <stdlib.h> +#include <new> + +#include "stdint.hpp" +#include "err.hpp" + +int zmq_msg_init (zmq_msg_t *msg_) +{ + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; + msg_->flags = 0; + msg_->vsm_size = 0; + return 0; +} + +int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) +{ + if (size_ <= ZMQ_MAX_VSM_SIZE) { + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; + msg_->flags = 0; + msg_->vsm_size = (uint8_t) size_; + } + else { + msg_->content = + (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_); + if (!msg_->content) { + errno = ENOMEM; + return -1; + } + msg_->flags = 0; + + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + content->data = (void*) (content + 1); + content->size = size_; + content->ffn = NULL; + content->hint = NULL; + new (&content->refcnt) zmq::atomic_counter_t (); + } + return 0; +} + +int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, + zmq_free_fn *ffn_, void *hint_) +{ + msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); + alloc_assert (msg_->content); + msg_->flags = 0; + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + content->data = data_; + content->size = size_; + content->ffn = ffn_; + content->hint = hint_; + new (&content->refcnt) zmq::atomic_counter_t (); + return 0; +} + +int zmq_msg_close (zmq_msg_t *msg_) +{ + // For VSMs and delimiters there are no resources to free. + if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && + msg_->content != (zmq::msg_content_t*) ZMQ_VSM) { + + // If the content is not shared, or if it is shared and the reference. + // count has dropped to zero, deallocate it. + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) { + + // We used "placement new" operator to initialize the reference. + // counter so we call its destructor now. + content->refcnt.~atomic_counter_t (); + + if (content->ffn) + content->ffn (content->data, content->hint); + free (content); + } + } + + // As a safety measure, let's make the deallocated message look like + // an empty message. + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; + msg_->flags = 0; + msg_->vsm_size = 0; + + return 0; +} + +int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) +{ + zmq_msg_close (dest_); + *dest_ = *src_; + zmq_msg_init (src_); + return 0; +} + +int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) +{ + zmq_msg_close (dest_); + + // VSMs and delimiters require no special handling. + if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && + src_->content != (zmq::msg_content_t*) ZMQ_VSM) { + + // One reference is added to shared messages. Non-shared messages + // are turned into shared messages and reference count is set to 2. + zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; + if (src_->flags & ZMQ_MSG_SHARED) + content->refcnt.add (1); + else { + src_->flags |= ZMQ_MSG_SHARED; + content->refcnt.set (2); + } + } + + *dest_ = *src_; + return 0; +} + +void *zmq_msg_data (zmq_msg_t *msg_) +{ + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) + return msg_->vsm_data; + if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) + return NULL; + + return ((zmq::msg_content_t*) msg_->content)->data; +} + +size_t zmq_msg_size (zmq_msg_t *msg_) +{ + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) + return msg_->vsm_size; + if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) + return 0; + + return ((zmq::msg_content_t*) msg_->content)->size; +} diff --git a/src/msg_content.hpp b/src/msg.hpp index 7e22098..7e22098 100644 --- a/src/msg_content.hpp +++ b/src/msg.hpp diff --git a/src/zmq.cpp b/src/zmq.cpp index 9ea106d..eb8cc40 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -40,7 +40,6 @@ #include <new> #include "socket_base.hpp" -#include "msg_content.hpp" #include "stdint.hpp" #include "config.hpp" #include "likely.hpp" @@ -70,136 +69,6 @@ const char *zmq_strerror (int errnum_) return zmq::errno_to_string (errnum_); } -int zmq_msg_init (zmq_msg_t *msg_) -{ - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = 0; - msg_->vsm_size = 0; - return 0; -} - -int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) -{ - if (size_ <= ZMQ_MAX_VSM_SIZE) { - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = 0; - msg_->vsm_size = (uint8_t) size_; - } - else { - msg_->content = - (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_); - if (!msg_->content) { - errno = ENOMEM; - return -1; - } - msg_->flags = 0; - - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - content->data = (void*) (content + 1); - content->size = size_; - content->ffn = NULL; - content->hint = NULL; - new (&content->refcnt) zmq::atomic_counter_t (); - } - return 0; -} - -int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, - zmq_free_fn *ffn_, void *hint_) -{ - msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); - alloc_assert (msg_->content); - msg_->flags = 0; - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - content->data = data_; - content->size = size_; - content->ffn = ffn_; - content->hint = hint_; - new (&content->refcnt) zmq::atomic_counter_t (); - return 0; -} - -int zmq_msg_close (zmq_msg_t *msg_) -{ - // For VSMs and delimiters there are no resources to free. - if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && - msg_->content != (zmq::msg_content_t*) ZMQ_VSM) { - - // If the content is not shared, or if it is shared and the reference. - // count has dropped to zero, deallocate it. - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) { - - // We used "placement new" operator to initialize the reference. - // counter so we call its destructor now. - content->refcnt.~atomic_counter_t (); - - if (content->ffn) - content->ffn (content->data, content->hint); - free (content); - } - } - - // As a safety measure, let's make the deallocated message look like - // an empty message. - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = 0; - msg_->vsm_size = 0; - - return 0; -} - -int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) -{ - zmq_msg_close (dest_); - *dest_ = *src_; - zmq_msg_init (src_); - return 0; -} - -int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) -{ - zmq_msg_close (dest_); - - // VSMs and delimiters require no special handling. - if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && - src_->content != (zmq::msg_content_t*) ZMQ_VSM) { - - // One reference is added to shared messages. Non-shared messages - // are turned into shared messages and reference count is set to 2. - zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; - if (src_->flags & ZMQ_MSG_SHARED) - content->refcnt.add (1); - else { - src_->flags |= ZMQ_MSG_SHARED; - content->refcnt.set (2); - } - } - - *dest_ = *src_; - return 0; -} - -void *zmq_msg_data (zmq_msg_t *msg_) -{ - if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) - return msg_->vsm_data; - if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) - return NULL; - - return ((zmq::msg_content_t*) msg_->content)->data; -} - -size_t zmq_msg_size (zmq_msg_t *msg_) -{ - if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) - return msg_->vsm_size; - if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) - return 0; - - return ((zmq::msg_content_t*) msg_->content)->size; -} - void *zmq_init (int io_threads_) { if (io_threads_ < 0) { |