summaryrefslogtreecommitdiff
path: root/src/msg.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/msg.cpp')
-rw-r--r--src/msg.cpp249
1 files changed, 164 insertions, 85 deletions
diff --git a/src/msg.cpp b/src/msg.cpp
index e800bd6..bd6f066 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -29,155 +29,234 @@
#include "likely.hpp"
#include "err.hpp"
-int zmq_msg_init (zmq_msg_t *msg_)
+bool zmq::msg_t::check ()
{
- msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
- msg_->vsm_size = 0;
+ return u.base.type >= type_min && u.base.type <= type_max;
+}
+
+int zmq::msg_t::init ()
+{
+ u.vsm.type = type_vsm;
+ u.vsm.flags = 0;
+ u.vsm.size = 0;
return 0;
}
-int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
+int zmq::msg_t::init_size (size_t size_)
{
- if (size_ <= ZMQ_MAX_VSM_SIZE) {
- msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
- msg_->vsm_size = (uint8_t) size_;
+ if (size_ <= max_vsm_size) {
+ u.vsm.type = type_vsm;
+ u.vsm.flags = 0;
+ u.vsm.size = (unsigned char) size_;
}
else {
- msg_->content =
- (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
- if (!msg_->content) {
+ u.lmsg.type = type_lmsg;
+ u.lmsg.flags = 0;
+ u.lmsg.content =
+ (content_t*) malloc (sizeof (content_t) + size_);
+ if (!u.lmsg.content) {
errno = ENOMEM;
return -1;
}
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
-
- zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- content->data = (void*) (content + 1);
- content->size = size_;
- content->ffn = NULL;
- content->hint = NULL;
- new (&content->refcnt) zmq::atomic_counter_t ();
+
+ u.lmsg.content->data = u.lmsg.content + 1;
+ u.lmsg.content->size = size_;
+ u.lmsg.content->ffn = NULL;
+ u.lmsg.content->hint = NULL;
+ new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
}
return 0;
}
-int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
- zmq_free_fn *ffn_, void *hint_)
+int zmq::msg_t::init_data (void *data_, size_t size_, zmq_free_fn *ffn_,
+ void *hint_)
{
- msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
- alloc_assert (msg_->content);
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
- zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- content->data = data_;
- content->size = size_;
- content->ffn = ffn_;
- content->hint = hint_;
- new (&content->refcnt) zmq::atomic_counter_t ();
+ u.lmsg.type = type_lmsg;
+ u.lmsg.flags = 0;
+ u.lmsg.content = (content_t*) malloc (sizeof (content_t));
+ alloc_assert (u.lmsg.content);
+
+ u.lmsg.content->data = data_;
+ u.lmsg.content->size = size_;
+ u.lmsg.content->ffn = ffn_;
+ u.lmsg.content->hint = hint_;
+ new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
return 0;
+
}
-int zmq_msg_close (zmq_msg_t *msg_)
+int zmq::msg_t::init_delimiter ()
{
- // Check the validity tag.
- if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) {
+ u.delimiter.type = type_delimiter;
+ u.delimiter.flags = 0;
+ return 0;
+}
+
+int zmq::msg_t::close ()
+{
+ // Check the validity of the message.
+ if (unlikely (!check ())) {
errno = EFAULT;
return -1;
}
- // For VSMs and delimiters there are no resources to free.
- if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
- msg_->content != (zmq::msg_content_t*) ZMQ_VSM) {
+ if (u.base.type == type_lmsg) {
- // If the content is not shared, or if it is shared and the reference.
+ // If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it.
- zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
+ if (!(u.lmsg.flags & msg_t::shared) ||
+ !u.lmsg.content->refcnt.sub (1)) {
- // We used "placement new" operator to initialize the reference.
- // counter so we call its destructor now.
- content->refcnt.~atomic_counter_t ();
+ // We used "placement new" operator to initialize the reference
+ // counter so we call the destructor explicitly now.
+ u.lmsg.content->refcnt.~atomic_counter_t ();
- if (content->ffn)
- content->ffn (content->data, content->hint);
- free (content);
+ if (u.lmsg.content->ffn)
+ u.lmsg.content->ffn (u.lmsg.content->data,
+ u.lmsg.content->hint);
+ free (u.lmsg.content);
}
}
- // Remove the validity tag from the message.
- msg_->flags = 0;
+ // Make the message invalid.
+ u.base.type = 0;
return 0;
+
}
-int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
+int zmq::msg_t::move (msg_t &src_)
{
- // Check the validity tags.
- if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||
- (src_->flags | ZMQ_MSG_MASK) != 0xff)) {
+ // Check the validity of the source.
+ if (unlikely (!src_.check ())) {
errno = EFAULT;
return -1;
}
- zmq_msg_close (dest_);
- *dest_ = *src_;
- zmq_msg_init (src_);
+ int rc = close ();
+ if (unlikely (rc < 0))
+ return rc;
+
+ *this = src_;
+
+ rc = src_.init ();
+ if (unlikely (rc < 0))
+ return rc;
+
return 0;
}
-int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
+int zmq::msg_t::copy (msg_t &src_)
{
- // Check the validity tags.
- if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||
- (src_->flags | ZMQ_MSG_MASK) != 0xff)) {
+ // Check the validity of the source.
+ if (unlikely (!src_.check ())) {
errno = EFAULT;
return -1;
}
- zmq_msg_close (dest_);
+ int rc = close ();
+ if (unlikely (rc < 0))
+ return rc;
- // VSMs and delimiters require no special handling.
- if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
- src_->content != (zmq::msg_content_t*) ZMQ_VSM) {
+ if (src_.u.base.type == type_lmsg) {
// One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2.
- zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
- if (src_->flags & ZMQ_MSG_SHARED)
- content->refcnt.add (1);
+ if (src_.u.lmsg.flags & msg_t::shared)
+ src_.u.lmsg.content->refcnt.add (1);
else {
- src_->flags |= ZMQ_MSG_SHARED;
- content->refcnt.set (2);
+ src_.u.lmsg.flags |= msg_t::shared;
+ src_.u.lmsg.content->refcnt.set (2);
}
}
- *dest_ = *src_;
+ *this = src_;
+
return 0;
+
+}
+
+void *zmq::msg_t::data ()
+{
+ // Check the validity of the message.
+ zmq_assert (check ());
+
+ switch (u.base.type) {
+ case type_vsm:
+ return u.vsm.data;
+ case type_lmsg:
+ return u.lmsg.content->data;
+ default:
+ zmq_assert (false);
+ }
}
-void *zmq_msg_data (zmq_msg_t *msg_)
+size_t zmq::msg_t::size ()
{
- // Check the validity tag.
- zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
+ // Check the validity of the message.
+ zmq_assert (check ());
- if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
- return msg_->vsm_data;
- if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
- return NULL;
+ switch (u.base.type) {
+ case type_vsm:
+ return u.vsm.size;
+ case type_lmsg:
+ return u.lmsg.content->size;
+ default:
+ zmq_assert (false);
+ }
+}
- return ((zmq::msg_content_t*) msg_->content)->data;
+unsigned char zmq::msg_t::flags ()
+{
+ return u.base.flags;
}
-size_t zmq_msg_size (zmq_msg_t *msg_)
+void zmq::msg_t::set_flags (unsigned char flags_)
{
- // Check the validity tag.
- zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
+ u.base.flags |= flags_;
+}
- if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
- return msg_->vsm_size;
- if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
- return 0;
+void zmq::msg_t::reset_flags (unsigned char flags_)
+{
+ u.base.flags &= ~flags_;
+}
+
+bool zmq::msg_t::is_delimiter ()
+{
+ return u.base.type == type_delimiter;
+}
+
+void zmq::msg_t::add_refs (int refs_)
+{
+ zmq_assert (refs_ >= 0);
- return ((zmq::msg_content_t*) msg_->content)->size;
+ // No copies required.
+ if (!refs_)
+ return;
+
+ // VSMs and delimiters can be copied straight away. The only message type
+ // that needs special care are long messages.
+ if (u.base.type == type_lmsg) {
+ if (u.lmsg.flags & msg_t::shared)
+ u.lmsg.content->refcnt.add (refs_);
+ else {
+ u.lmsg.content->refcnt.set (refs_ + 1);
+ u.lmsg.flags |= msg_t::shared;
+ }
+ }
+}
+
+void zmq::msg_t::rm_refs (int refs_)
+{
+ zmq_assert (refs_ >= 0);
+
+ // No copies required.
+ if (!refs_)
+ return;
+
+ // The only message type that needs special care are long messages.
+ if (u.base.type == type_lmsg) {
+ zmq_assert (u.lmsg.flags & msg_t::shared);
+ u.lmsg.content->refcnt.sub (refs_);
+ }
}