From 6be4b0143793ab5ceebc5d9d6bbe5c2f1333a0d2 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 21 Aug 2009 14:29:22 +0200 Subject: session management implemented --- src/zmq.cpp | 111 ++++++++++++++++++++++++++++++++---------------------------- 1 file changed, 59 insertions(+), 52 deletions(-) (limited to 'src/zmq.cpp') diff --git a/src/zmq.cpp b/src/zmq.cpp index 7ebe003..49096ad 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -26,76 +26,79 @@ #include "socket_base.hpp" #include "err.hpp" #include "dispatcher.hpp" -#include "msg.hpp" +#include "msg_content.hpp" -int zmq_msg_init (zmq_msg *msg_) +int zmq_msg_init (zmq_msg_t *msg_) { - msg_->content = (zmq_msg_content*) ZMQ_VSM; + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; msg_->vsm_size = 0; return 0; } -int zmq_msg_init_size (zmq_msg *msg_, size_t size_) +int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) { if (size_ <= ZMQ_MAX_VSM_SIZE) { - msg_->content = (zmq_msg_content*) ZMQ_VSM; + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; msg_->vsm_size = (uint16_t) size_; } else { - msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content) + - size_); + msg_->content = + (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_); if (!msg_->content) { errno = ENOMEM; return -1; } msg_->shared = 0; - - msg_->content->data = (void*) (msg_->content + 1); - msg_->content->size = size_; - msg_->content->ffn = NULL; - new (&msg_->content->refcnt) zmq::atomic_counter_t (); + + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + content->data = (void*) (content + 1); + content->size = size_; + content->ffn = NULL; + new (&content->refcnt) zmq::atomic_counter_t (); } return 0; } -int zmq_msg_init_data (zmq_msg *msg_, void *data_, size_t size_, +int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_) { msg_->shared = 0; - msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content)); + msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); zmq_assert (msg_->content); - msg_->content->data = data_; - msg_->content->size = size_; - msg_->content->ffn = ffn_; - new (&msg_->content->refcnt) zmq::atomic_counter_t (); + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + content->data = data_; + content->size = size_; + content->ffn = ffn_; + new (&content->refcnt) zmq::atomic_counter_t (); return 0; } -int zmq_msg_close (zmq_msg *msg_) +int zmq_msg_close (zmq_msg_t *msg_) { - // For VSMs and delimiters there are no resources to free - if (msg_->content == (zmq_msg_content*) ZMQ_DELIMITER || - msg_->content == (zmq_msg_content*) ZMQ_VSM || - msg_->content == (zmq_msg_content*) ZMQ_GAP) + // For VSMs and delimiters there are no resources to free. + if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER || + msg_->content == (zmq::msg_content_t*) ZMQ_VSM || + msg_->content == (zmq::msg_content_t*) ZMQ_GAP) return 0; - // If the content is not shared, or if it is shared and the reference + // If the content is not shared, or if it is shared and the reference. // count has dropped to zero, deallocate it. - if (!msg_->shared || !msg_->content->refcnt.sub (1)) { + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + if (!msg_->shared || !content->refcnt.sub (1)) { - // We used "placement new" operator to initialize the reference + // We used "placement new" operator to initialize the reference. // counter so we call its destructor now. - msg_->content->refcnt.~atomic_counter_t (); + content->refcnt.~atomic_counter_t (); - if (msg_->content->ffn) - msg_->content->ffn (msg_->content->data); - free (msg_->content); + if (content->ffn) + content->ffn (content->data); + free (content); } return 0; } -int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_) +int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) { zmq_msg_close (dest_); *dest_ = *src_; @@ -103,23 +106,24 @@ int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_) return 0; } -int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_) +int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) { zmq_msg_close (dest_); // VSMs and delimiters require no special handling. if (src_->content != - (zmq_msg_content*) ZMQ_DELIMITER && - src_->content != (zmq_msg_content*) ZMQ_VSM && - src_->content != (zmq_msg_content*) ZMQ_GAP) { + (zmq::msg_content_t*) ZMQ_DELIMITER && + src_->content != (zmq::msg_content_t*) ZMQ_VSM && + src_->content != (zmq::msg_content_t*) ZMQ_GAP) { // One reference is added to shared messages. Non-shared messages // are turned into shared messages and reference count is set to 2. + zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; if (src_->shared) - src_->content->refcnt.add (1); + content->refcnt.add (1); else { src_->shared = true; - src_->content->refcnt.set (2); + content->refcnt.set (2); } } @@ -127,32 +131,34 @@ int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_) return 0; } -void *zmq_msg_data (zmq_msg *msg_) +void *zmq_msg_data (zmq_msg_t *msg_) { - if (msg_->content == (zmq_msg_content*) ZMQ_VSM) + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) return msg_->vsm_data; if (msg_->content == - (zmq_msg_content*) ZMQ_DELIMITER || - msg_->content == (zmq_msg_content*) ZMQ_GAP) + (zmq::msg_content_t*) ZMQ_DELIMITER || + msg_->content == (zmq::msg_content_t*) ZMQ_GAP) return NULL; - return msg_->content->data; + + return ((zmq::msg_content_t*) msg_->content)->data; } -size_t zmq_msg_size (zmq_msg *msg_) +size_t zmq_msg_size (zmq_msg_t *msg_) { - if (msg_->content == (zmq_msg_content*) ZMQ_VSM) + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) return msg_->vsm_size; if (msg_->content == - (zmq_msg_content*) ZMQ_DELIMITER || - msg_->content == (zmq_msg_content*) ZMQ_GAP) + (zmq::msg_content_t*) ZMQ_DELIMITER || + msg_->content == (zmq::msg_content_t*) ZMQ_GAP) return 0; - return msg_->content->size; + + return ((zmq::msg_content_t*) msg_->content)->size; } -int zmq_msg_type (zmq_msg *msg_) +int zmq_msg_type (zmq_msg_t *msg_) { // If it's a genuine message, return 0. - if (msg_->content >= (zmq_msg_content*) ZMQ_VSM) + if (msg_->content >= (zmq::msg_content_t*) ZMQ_VSM) return 0; // Trick the compiler to believe that content is an integer. @@ -192,7 +198,8 @@ int zmq_close (void *s_) return 0; } -int zmq_setsockopt (void *s_, int option_, void *optval_, size_t optvallen_) +int zmq_setsockopt (void *s_, int option_, const void *optval_, + size_t optvallen_) { return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_, optvallen_)); @@ -208,7 +215,7 @@ int zmq_connect (void *s_, const char *addr_) return (((zmq::socket_base_t*) s_)->connect (addr_)); } -int zmq_send (void *s_, zmq_msg *msg_, int flags_) +int zmq_send (void *s_, zmq_msg_t *msg_, int flags_) { return (((zmq::socket_base_t*) s_)->send (msg_, flags_)); } @@ -218,7 +225,7 @@ int zmq_flush (void *s_) return (((zmq::socket_base_t*) s_)->flush ()); } -int zmq_recv (void *s_, zmq_msg *msg_, int flags_) +int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) { return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); } -- cgit v1.2.3