summaryrefslogtreecommitdiff
path: root/src/zmq.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq.cpp')
-rw-r--r--src/zmq.cpp111
1 files changed, 59 insertions, 52 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 7ebe003..49096ad 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -26,76 +26,79 @@
#include "socket_base.hpp"
#include "err.hpp"
#include "dispatcher.hpp"
-#include "msg.hpp"
+#include "msg_content.hpp"
-int zmq_msg_init (zmq_msg *msg_)
+int zmq_msg_init (zmq_msg_t *msg_)
{
- msg_->content = (zmq_msg_content*) ZMQ_VSM;
+ msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
msg_->vsm_size = 0;
return 0;
}
-int zmq_msg_init_size (zmq_msg *msg_, size_t size_)
+int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
if (size_ <= ZMQ_MAX_VSM_SIZE) {
- msg_->content = (zmq_msg_content*) ZMQ_VSM;
+ msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
msg_->vsm_size = (uint16_t) size_;
}
else {
- msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content) +
- size_);
+ msg_->content =
+ (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
if (!msg_->content) {
errno = ENOMEM;
return -1;
}
msg_->shared = 0;
-
- msg_->content->data = (void*) (msg_->content + 1);
- msg_->content->size = size_;
- msg_->content->ffn = NULL;
- new (&msg_->content->refcnt) zmq::atomic_counter_t ();
+
+ zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
+ content->data = (void*) (content + 1);
+ content->size = size_;
+ content->ffn = NULL;
+ new (&content->refcnt) zmq::atomic_counter_t ();
}
return 0;
}
-int zmq_msg_init_data (zmq_msg *msg_, void *data_, size_t size_,
+int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
zmq_free_fn *ffn_)
{
msg_->shared = 0;
- msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content));
+ msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
zmq_assert (msg_->content);
- msg_->content->data = data_;
- msg_->content->size = size_;
- msg_->content->ffn = ffn_;
- new (&msg_->content->refcnt) zmq::atomic_counter_t ();
+ zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
+ content->data = data_;
+ content->size = size_;
+ content->ffn = ffn_;
+ new (&content->refcnt) zmq::atomic_counter_t ();
return 0;
}
-int zmq_msg_close (zmq_msg *msg_)
+int zmq_msg_close (zmq_msg_t *msg_)
{
- // For VSMs and delimiters there are no resources to free
- if (msg_->content == (zmq_msg_content*) ZMQ_DELIMITER ||
- msg_->content == (zmq_msg_content*) ZMQ_VSM ||
- msg_->content == (zmq_msg_content*) ZMQ_GAP)
+ // For VSMs and delimiters there are no resources to free.
+ if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER ||
+ msg_->content == (zmq::msg_content_t*) ZMQ_VSM ||
+ msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return 0;
- // If the content is not shared, or if it is shared and the reference
+ // If the content is not shared, or if it is shared and the reference.
// count has dropped to zero, deallocate it.
- if (!msg_->shared || !msg_->content->refcnt.sub (1)) {
+ zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
+ if (!msg_->shared || !content->refcnt.sub (1)) {
- // We used "placement new" operator to initialize the reference
+ // We used "placement new" operator to initialize the reference.
// counter so we call its destructor now.
- msg_->content->refcnt.~atomic_counter_t ();
+ content->refcnt.~atomic_counter_t ();
- if (msg_->content->ffn)
- msg_->content->ffn (msg_->content->data);
- free (msg_->content);
+ if (content->ffn)
+ content->ffn (content->data);
+ free (content);
}
return 0;
}
-int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_)
+int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
zmq_msg_close (dest_);
*dest_ = *src_;
@@ -103,23 +106,24 @@ int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_)
return 0;
}
-int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_)
+int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
zmq_msg_close (dest_);
// VSMs and delimiters require no special handling.
if (src_->content !=
- (zmq_msg_content*) ZMQ_DELIMITER &&
- src_->content != (zmq_msg_content*) ZMQ_VSM &&
- src_->content != (zmq_msg_content*) ZMQ_GAP) {
+ (zmq::msg_content_t*) ZMQ_DELIMITER &&
+ src_->content != (zmq::msg_content_t*) ZMQ_VSM &&
+ src_->content != (zmq::msg_content_t*) ZMQ_GAP) {
// One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2.
+ zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
if (src_->shared)
- src_->content->refcnt.add (1);
+ content->refcnt.add (1);
else {
src_->shared = true;
- src_->content->refcnt.set (2);
+ content->refcnt.set (2);
}
}
@@ -127,32 +131,34 @@ int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_)
return 0;
}
-void *zmq_msg_data (zmq_msg *msg_)
+void *zmq_msg_data (zmq_msg_t *msg_)
{
- if (msg_->content == (zmq_msg_content*) ZMQ_VSM)
+ if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
return msg_->vsm_data;
if (msg_->content ==
- (zmq_msg_content*) ZMQ_DELIMITER ||
- msg_->content == (zmq_msg_content*) ZMQ_GAP)
+ (zmq::msg_content_t*) ZMQ_DELIMITER ||
+ msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return NULL;
- return msg_->content->data;
+
+ return ((zmq::msg_content_t*) msg_->content)->data;
}
-size_t zmq_msg_size (zmq_msg *msg_)
+size_t zmq_msg_size (zmq_msg_t *msg_)
{
- if (msg_->content == (zmq_msg_content*) ZMQ_VSM)
+ if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
return msg_->vsm_size;
if (msg_->content ==
- (zmq_msg_content*) ZMQ_DELIMITER ||
- msg_->content == (zmq_msg_content*) ZMQ_GAP)
+ (zmq::msg_content_t*) ZMQ_DELIMITER ||
+ msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return 0;
- return msg_->content->size;
+
+ return ((zmq::msg_content_t*) msg_->content)->size;
}
-int zmq_msg_type (zmq_msg *msg_)
+int zmq_msg_type (zmq_msg_t *msg_)
{
// If it's a genuine message, return 0.
- if (msg_->content >= (zmq_msg_content*) ZMQ_VSM)
+ if (msg_->content >= (zmq::msg_content_t*) ZMQ_VSM)
return 0;
// Trick the compiler to believe that content is an integer.
@@ -192,7 +198,8 @@ int zmq_close (void *s_)
return 0;
}
-int zmq_setsockopt (void *s_, int option_, void *optval_, size_t optvallen_)
+int zmq_setsockopt (void *s_, int option_, const void *optval_,
+ size_t optvallen_)
{
return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
optvallen_));
@@ -208,7 +215,7 @@ int zmq_connect (void *s_, const char *addr_)
return (((zmq::socket_base_t*) s_)->connect (addr_));
}
-int zmq_send (void *s_, zmq_msg *msg_, int flags_)
+int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
{
return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
}
@@ -218,7 +225,7 @@ int zmq_flush (void *s_)
return (((zmq::socket_base_t*) s_)->flush ());
}
-int zmq_recv (void *s_, zmq_msg *msg_, int flags_)
+int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
{
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}