summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-03-09 16:56:53 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-03-09 16:56:53 +0100
commit531c6af0d4df606ddef15da821dad20399b9480a (patch)
tree168140cb9d656469e4d4f66a187f5a4dd43b5fe4
parent96ccc1c5fceb56bd7ffc2e6bef9ddab5347d722b (diff)
message flags added to zmq_msg_t strcuture
-rw-r--r--bindings/c/zmq.h14
-rw-r--r--src/pipe.cpp2
-rw-r--r--src/pub.cpp4
-rw-r--r--src/zmq.cpp14
4 files changed, 20 insertions, 14 deletions
diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h
index 9eac38a..7d8d8ad 100644
--- a/bindings/c/zmq.h
+++ b/bindings/c/zmq.h
@@ -102,15 +102,19 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);
#define ZMQ_DELIMITER 31
#define ZMQ_VSM 32
-// A message. If 'shared' is true, message content pointed to by 'content'
-// is shared, i.e. reference counting is used to manage its lifetime
-// rather than straighforward malloc/free. Not that 'content' is not a pointer
-// to the raw data. Rather it is pointer to zmq::msg_content_t structure
+// Message flags. ZMQ_MSG_SHARED is strictly speaking not a message flag
+// (it has no equivalent in the wire format), however, making it a flag
+// allows us to pack the stucture tigher and thus improve performance.
+#define ZMQ_MSG_TBC 1
+#define ZMQ_MSG_SHARED 128
+
+// A message. Note that 'content' is not a pointer to the raw data.
+// Rather it is pointer to zmq::msg_content_t structure
// (see src/msg_content.hpp for its definition).
typedef struct
{
void *content;
- unsigned char shared;
+ unsigned char flags;
unsigned char vsm_size;
unsigned char vsm_data [ZMQ_MAX_VSM_SIZE];
} zmq_msg_t;
diff --git a/src/pipe.cpp b/src/pipe.cpp
index e738128..f4582f2 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -174,7 +174,7 @@ void zmq::writer_t::term ()
zmq_msg_t msg;
const unsigned char *offset = 0;
msg.content = (void*) (offset + ZMQ_DELIMITER);
- msg.shared = false;
+ msg.flags = 0;
pipe->write (msg);
pipe->flush ();
}
diff --git a/src/pub.cpp b/src/pub.cpp
index 5b9d48c..342b1d2 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -121,11 +121,11 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// There are at least 2 destinations for the message. That means we have
// to deal with reference counting. First add N-1 references to
// the content (we are holding one reference anyway, that's why -1).
- if (msg_->shared)
+ if (msg_->flags & ZMQ_MSG_SHARED)
content->refcnt.add (pipes_count - 1);
else {
content->refcnt.set (pipes_count);
- msg_->shared = true;
+ msg_->flags |= ZMQ_MSG_SHARED;
}
// Push the message to all destinations.
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 409416d..8540e06 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -96,6 +96,7 @@ const char *zmq_strerror (int errnum_)
int zmq_msg_init (zmq_msg_t *msg_)
{
msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
+ msg_->flags = 0;
msg_->vsm_size = 0;
return 0;
}
@@ -104,6 +105,7 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
if (size_ <= ZMQ_MAX_VSM_SIZE) {
msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
+ msg_->flags = 0;
msg_->vsm_size = (uint8_t) size_;
}
else {
@@ -113,8 +115,8 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
errno = ENOMEM;
return -1;
}
- msg_->shared = 0;
-
+ msg_->flags = 0;
+
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
content->data = (void*) (content + 1);
content->size = size_;
@@ -128,9 +130,9 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
zmq_free_fn *ffn_, void *hint_)
{
- msg_->shared = 0;
msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
zmq_assert (msg_->content);
+ msg_->flags = 0;
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
content->data = data_;
content->size = size_;
@@ -150,7 +152,7 @@ int zmq_msg_close (zmq_msg_t *msg_)
// If the content is not shared, or if it is shared and the reference.
// count has dropped to zero, deallocate it.
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- if (!msg_->shared || !content->refcnt.sub (1)) {
+ if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
// We used "placement new" operator to initialize the reference.
// counter so we call its destructor now.
@@ -183,10 +185,10 @@ int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
// One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2.
zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
- if (src_->shared)
+ if (src_->flags & ZMQ_MSG_SHARED)
content->refcnt.add (1);
else {
- src_->shared = true;
+ src_->flags |= ZMQ_MSG_SHARED;
content->refcnt.set (2);
}
}