From 531c6af0d4df606ddef15da821dad20399b9480a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 9 Mar 2010 16:56:53 +0100 Subject: message flags added to zmq_msg_t strcuture --- bindings/c/zmq.h | 14 +++++++++----- src/pipe.cpp | 2 +- src/pub.cpp | 4 ++-- src/zmq.cpp | 14 ++++++++------ 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index 9eac38a..7d8d8ad 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -102,15 +102,19 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum); #define ZMQ_DELIMITER 31 #define ZMQ_VSM 32 -// A message. If 'shared' is true, message content pointed to by 'content' -// is shared, i.e. reference counting is used to manage its lifetime -// rather than straighforward malloc/free. Not that 'content' is not a pointer -// to the raw data. Rather it is pointer to zmq::msg_content_t structure +// Message flags. ZMQ_MSG_SHARED is strictly speaking not a message flag +// (it has no equivalent in the wire format), however, making it a flag +// allows us to pack the stucture tigher and thus improve performance. +#define ZMQ_MSG_TBC 1 +#define ZMQ_MSG_SHARED 128 + +// A message. Note that 'content' is not a pointer to the raw data. +// Rather it is pointer to zmq::msg_content_t structure // (see src/msg_content.hpp for its definition). typedef struct { void *content; - unsigned char shared; + unsigned char flags; unsigned char vsm_size; unsigned char vsm_data [ZMQ_MAX_VSM_SIZE]; } zmq_msg_t; diff --git a/src/pipe.cpp b/src/pipe.cpp index e738128..f4582f2 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -174,7 +174,7 @@ void zmq::writer_t::term () zmq_msg_t msg; const unsigned char *offset = 0; msg.content = (void*) (offset + ZMQ_DELIMITER); - msg.shared = false; + msg.flags = 0; pipe->write (msg); pipe->flush (); } diff --git a/src/pub.cpp b/src/pub.cpp index 5b9d48c..342b1d2 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -121,11 +121,11 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) // There are at least 2 destinations for the message. That means we have // to deal with reference counting. First add N-1 references to // the content (we are holding one reference anyway, that's why -1). - if (msg_->shared) + if (msg_->flags & ZMQ_MSG_SHARED) content->refcnt.add (pipes_count - 1); else { content->refcnt.set (pipes_count); - msg_->shared = true; + msg_->flags |= ZMQ_MSG_SHARED; } // Push the message to all destinations. diff --git a/src/zmq.cpp b/src/zmq.cpp index 409416d..8540e06 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -96,6 +96,7 @@ const char *zmq_strerror (int errnum_) int zmq_msg_init (zmq_msg_t *msg_) { msg_->content = (zmq::msg_content_t*) ZMQ_VSM; + msg_->flags = 0; msg_->vsm_size = 0; return 0; } @@ -104,6 +105,7 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) { if (size_ <= ZMQ_MAX_VSM_SIZE) { msg_->content = (zmq::msg_content_t*) ZMQ_VSM; + msg_->flags = 0; msg_->vsm_size = (uint8_t) size_; } else { @@ -113,8 +115,8 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) errno = ENOMEM; return -1; } - msg_->shared = 0; - + msg_->flags = 0; + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; content->data = (void*) (content + 1); content->size = size_; @@ -128,9 +130,9 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_) { - msg_->shared = 0; msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); zmq_assert (msg_->content); + msg_->flags = 0; zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; content->data = data_; content->size = size_; @@ -150,7 +152,7 @@ int zmq_msg_close (zmq_msg_t *msg_) // If the content is not shared, or if it is shared and the reference. // count has dropped to zero, deallocate it. zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - if (!msg_->shared || !content->refcnt.sub (1)) { + if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) { // We used "placement new" operator to initialize the reference. // counter so we call its destructor now. @@ -183,10 +185,10 @@ int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) // One reference is added to shared messages. Non-shared messages // are turned into shared messages and reference count is set to 2. zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; - if (src_->shared) + if (src_->flags & ZMQ_MSG_SHARED) content->refcnt.add (1); else { - src_->shared = true; + src_->flags |= ZMQ_MSG_SHARED; content->refcnt.set (2); } } -- cgit v1.2.3