diff options
-rw-r--r-- | include/zmq.h | 28 | ||||
-rw-r--r-- | include/zmq.hpp | 11 | ||||
-rw-r--r-- | src/Makefile.am | 10 | ||||
-rw-r--r-- | src/i_inout.hpp | 4 | ||||
-rw-r--r-- | src/msg_content.hpp (renamed from src/msg.hpp) | 9 | ||||
-rw-r--r-- | src/object.cpp | 5 | ||||
-rw-r--r-- | src/object.hpp | 4 | ||||
-rw-r--r-- | src/owned.cpp | 10 | ||||
-rw-r--r-- | src/owned.hpp | 4 | ||||
-rw-r--r-- | src/pipe.hpp | 2 | ||||
-rw-r--r-- | src/session.cpp | 39 | ||||
-rw-r--r-- | src/session.hpp | 15 | ||||
-rw-r--r-- | src/socket_base.cpp | 77 | ||||
-rw-r--r-- | src/socket_base.hpp | 24 | ||||
-rw-r--r-- | src/zmq.cpp | 111 | ||||
-rw-r--r-- | src/zmq_connecter.cpp | 11 | ||||
-rw-r--r-- | src/zmq_connecter.hpp | 10 | ||||
-rw-r--r-- | src/zmq_connecter_init.cpp | 94 | ||||
-rw-r--r-- | src/zmq_connecter_init.hpp (renamed from src/zmq_init.hpp) | 43 | ||||
-rw-r--r-- | src/zmq_decoder.hpp | 2 | ||||
-rw-r--r-- | src/zmq_encoder.hpp | 2 | ||||
-rw-r--r-- | src/zmq_init.cpp | 112 | ||||
-rw-r--r-- | src/zmq_listener.cpp | 5 | ||||
-rw-r--r-- | src/zmq_listener_init.cpp | 96 | ||||
-rw-r--r-- | src/zmq_listener_init.hpp | 71 |
25 files changed, 519 insertions, 280 deletions
diff --git a/include/zmq.h b/include/zmq.h index 46d779b..34ce80c 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -94,50 +94,50 @@ typedef void (zmq_free_fn) (void *data); // is shared, i.e. reference counting is used to manage its lifetime // rather than straighforward malloc/free. struct zmq_msg_content is // not declared in the API. -struct zmq_msg +struct zmq_msg_t { - struct zmq_msg_content *content; + void *content; unsigned char shared; uint16_t vsm_size; unsigned char vsm_data [ZMQ_MAX_VSM_SIZE]; }; // Initialise an empty message (zero bytes long). -ZMQ_EXPORT int zmq_msg_init (zmq_msg *msg); +ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg); // Initialise a message 'size' bytes long. // // Errors: ENOMEM - the size is too large to allocate. -ZMQ_EXPORT int zmq_msg_init_size (zmq_msg *msg, size_t size); +ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size); // Initialise a message from an existing buffer. Message isn't copied, // instead 0SOCKETS infrastructure take ownership of the buffer and call // deallocation functio (ffn) once it's not needed anymore. -ZMQ_EXPORT int zmq_msg_init_data (zmq_msg *msg, void *data, size_t size, +ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn); // Deallocate the message. -ZMQ_EXPORT int zmq_msg_close (zmq_msg *msg); +ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg); // Move the content of the message from 'src' to 'dest'. The content isn't // copied, just moved. 'src' is an empty message after the call. Original // content of 'dest' message is deallocated. -ZMQ_EXPORT int zmq_msg_move (zmq_msg *dest, zmq_msg *src); +ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); // Copy the 'src' message to 'dest'. The content isn't copied, instead // reference count is increased. Don't modify the message data after the // call as they are shared between two messages. Original content of 'dest' // message is deallocated. -ZMQ_EXPORT int zmq_msg_copy (zmq_msg *dest, zmq_msg *src); +ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); // Returns pointer to message data. -ZMQ_EXPORT void *zmq_msg_data (zmq_msg *msg); +ZMQ_EXPORT void *zmq_msg_data (zmq_msg_t *msg); // Return size of message data (in bytes). -ZMQ_EXPORT size_t zmq_msg_size (zmq_msg *msg); +ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg); // Returns type of the message. -ZMQ_EXPORT int zmq_msg_type (zmq_msg *msg); +ZMQ_EXPORT int zmq_msg_type (zmq_msg_t *msg); // Initialise 0SOCKETS context. 'app_threads' specifies maximal number // of application threads that can have open sockets at the same time. @@ -163,7 +163,7 @@ ZMQ_EXPORT int zmq_close (void *s); // Sets an option on the socket. // EINVAL - unknown option, a value with incorrect length or an invalid value. -ZMQ_EXPORT int zmq_setsockopt (void *s, int option_, void *optval_, +ZMQ_EXPORT int zmq_setsockopt (void *s, int option_, const void *optval_, size_t optvallen_); // Bind the socket to a particular address. @@ -182,7 +182,7 @@ ZMQ_EXPORT int zmq_connect (void *s, const char *addr); // Errors: EAGAIN - message cannot be sent at the moment (applies only to // non-blocking send). // ENOTSUP - function isn't supported by particular socket type. -ZMQ_EXPORT int zmq_send (void *s, zmq_msg *msg, int flags); +ZMQ_EXPORT int zmq_send (void *s, zmq_msg_t *msg, int flags); // Flush the messages that were send using ZMQ_NOFLUSH flag down the stream. // @@ -196,7 +196,7 @@ ZMQ_EXPORT int zmq_flush (void *s); // Errors: EAGAIN - message cannot be received at the moment (applies only to // non-blocking receive). // ENOTSUP - function isn't supported by particular socket type. -ZMQ_EXPORT int zmq_recv (void *s, zmq_msg *msg, int flags); +ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags); #ifdef __cplusplus } diff --git a/include/zmq.hpp b/include/zmq.hpp index e8e4f15..8397464 100644 --- a/include/zmq.hpp +++ b/include/zmq.hpp @@ -74,7 +74,7 @@ namespace zmq // copied it - the behaviour is undefined. Don't change the body of the // received message either - other threads may be accessing it in parallel. - class message_t : private zmq_msg + class message_t : private zmq_msg_t { friend class socket_t; @@ -139,7 +139,7 @@ namespace zmq // of data after the operation. inline void move_to (message_t *msg_) { - int rc = zmq_msg_move (this, (zmq_msg*) msg_); + int rc = zmq_msg_move (this, (zmq_msg_t*) msg_); assert (rc == 0); } @@ -148,7 +148,7 @@ namespace zmq // these get deallocated. inline void copy_to (message_t *msg_) { - int rc = zmq_msg_copy (this, (zmq_msg*) msg_); + int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_); assert (rc == 0); } @@ -230,9 +230,10 @@ namespace zmq assert (rc == 0); } - template <typename T> inline void setsockopt (int option_, T &value_) + inline void setsockopt (int option_, const void *optval_, + size_t optvallen_) { - int rc = zmq_setsockopt (ptr, option_, (void*) &value_, sizeof (T)); + int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); assert (rc == 0); } diff --git a/src/Makefile.am b/src/Makefile.am index 9d6127c..396e3a3 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,7 +23,7 @@ libzmq_la_SOURCES = \ i_poll_events.hpp \ i_signaler.hpp \ kqueue.hpp \ - msg.hpp \ + msg_content.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -47,11 +47,12 @@ libzmq_la_SOURCES = \ ypollset.hpp \ yqueue.hpp \ zmq_connecter.hpp \ + zmq_connecter_init.hpp \ zmq_decoder.hpp \ zmq_encoder.hpp \ zmq_engine.hpp \ - zmq_init.hpp \ zmq_listener.hpp \ + zmq_listener_init.hpp \ app_thread.cpp \ devpoll.cpp \ dispatcher.cpp \ @@ -77,11 +78,12 @@ libzmq_la_SOURCES = \ ypollset.cpp \ zmq.cpp \ zmq_connecter.cpp \ + zmq_connecter_init.cpp \ zmq_decoder.cpp \ zmq_encoder.cpp \ zmq_engine.cpp \ - zmq_init.cpp \ - zmq_listener.cpp + zmq_listener.cpp \ + zmq_listener_init.cpp libzmq_la_LDFLAGS = -version-info 0:0:0 libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@ diff --git a/src/i_inout.hpp b/src/i_inout.hpp index be2e007..8901c04 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -27,8 +27,8 @@ namespace zmq struct i_inout { - virtual bool read (::zmq_msg *msg_) = 0; - virtual bool write (::zmq_msg *msg_) = 0; + virtual bool read (::zmq_msg_t *msg_) = 0; + virtual bool write (::zmq_msg_t *msg_) = 0; virtual void flush () = 0; }; diff --git a/src/msg.hpp b/src/msg_content.hpp index 4f35961..b468746 100644 --- a/src/msg.hpp +++ b/src/msg_content.hpp @@ -26,8 +26,8 @@ #include "atomic_counter.hpp" -//namespace zmq -//{ +namespace zmq +{ // Shared message buffer. Message data are either allocated in one // continuous block along with this structure - thus avoiding one @@ -36,7 +36,8 @@ // used to deallocate the data. If the buffer is actually shared (there // are at least 2 references to it) refcount member contains number of // references. - struct zmq_msg_content + + struct msg_content_t { void *data; size_t size; @@ -44,6 +45,6 @@ zmq::atomic_counter_t refcnt; }; -//} +} #endif diff --git a/src/object.cpp b/src/object.cpp index b1b7c8a..0a25750 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -131,9 +131,8 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_) { - // Let the object know that it cannot shut down till it gets this command. - destination_->inc_seqnum (); - + // The assumption here is that command sequence number of the destination + // object was already incremented in find_session function. command_t cmd; cmd.destination = destination_; cmd.type = command_t::attach; diff --git a/src/object.hpp b/src/object.hpp index 02a071a..31c8c40 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -30,6 +30,10 @@ namespace zmq class object_t { + // Repository of sessions needs to use caller's send_* functions + // when creating new session. TODO: Get rid of this dependency. + friend class socket_base_t; + public: object_t (class dispatcher_t *dispatcher_, int thread_slot_); diff --git a/src/owned.cpp b/src/owned.cpp index 6995a39..810bed7 100644 --- a/src/owned.cpp +++ b/src/owned.cpp @@ -47,6 +47,14 @@ void zmq::owned_t::process_plug () finalise_command (); } +void zmq::owned_t::process_attach (zmq_engine_t *engine_) +{ + // Keep track of how many commands were processed so far. + processed_seqnum++; + + finalise_command (); +} + void zmq::owned_t::term () { send_term_req (owner, this); @@ -65,8 +73,8 @@ void zmq::owned_t::finalise_command () // If termination request was already received and there are no more // commands to wait for, terminate the object. if (shutting_down && processed_seqnum == sent_seqnum.get ()) { - send_term_ack (owner); process_unplug (); + send_term_ack (owner); delete this; } } diff --git a/src/owned.hpp b/src/owned.hpp index 22595d1..78036a3 100644 --- a/src/owned.hpp +++ b/src/owned.hpp @@ -59,6 +59,10 @@ namespace zmq // handler. void process_plug (); + // It's vital that session invokes io_object_t::process_attach + // at the end of it's own attach handler. + void process_attach (class zmq_engine_t *engine_); + // io_object_t defines a new handler used to disconnect the object // from the poller object. Implement the handlen in the derived // classes to ensure sane cleanup. diff --git a/src/pipe.hpp b/src/pipe.hpp index d771120..28e4b4d 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -29,7 +29,7 @@ namespace zmq { // Message pipe. - class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity> + class pipe_t : public ypipe_t <zmq_msg_t, false, message_pipe_granularity> { }; diff --git a/src/session.cpp b/src/session.cpp index 2bb4ff6..fc1f858 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -22,9 +22,10 @@ #include "err.hpp" zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - zmq_engine_t *engine_) : + const char *name_) : owned_t (parent_, owner_), - engine (engine_) + engine (NULL), + name (name_) { } @@ -32,12 +33,12 @@ zmq::session_t::~session_t () { } -bool zmq::session_t::read (::zmq_msg *msg_) +bool zmq::session_t::read (::zmq_msg_t *msg_) { return false; } -bool zmq::session_t::write (::zmq_msg *msg_) +bool zmq::session_t::write (::zmq_msg_t *msg_) { return false; } @@ -48,14 +49,34 @@ void zmq::session_t::flush () void zmq::session_t::process_plug () { - zmq_assert (engine); - engine->plug (this); + // Register the session with the socket. + bool ok = owner->register_session (name.c_str (), this); + + // There's already a session with the specified identity. + // We should syslog it and drop the session. TODO + zmq_assert (ok); + owned_t::process_plug (); } void zmq::session_t::process_unplug () { - zmq_assert (engine); - engine->unplug (); - delete engine; + // Unregister the session from the socket. + bool ok = owner->unregister_session (name.c_str ()); + zmq_assert (ok); + + if (engine) { + engine->unplug (); + delete engine; + engine = NULL; + } +} + +void zmq::session_t::process_attach (class zmq_engine_t *engine_) +{ + zmq_assert (engine_); + engine = engine_; + engine->plug (this); + + owned_t::process_attach (engine_); } diff --git a/src/session.hpp b/src/session.hpp index 2cb8e18..6d6bcf7 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,8 +20,11 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ +#include <string> + #include "i_inout.hpp" #include "owned.hpp" +#include "options.hpp" namespace zmq { @@ -30,24 +33,28 @@ namespace zmq { public: - session_t (object_t *parent_, socket_base_t *owner_, - class zmq_engine_t *engine_); + session_t (object_t *parent_, socket_base_t *owner_, const char *name_); private: ~session_t (); // i_inout interface implementation. - bool read (::zmq_msg *msg_); - bool write (::zmq_msg *msg_); + bool read (::zmq_msg_t *msg_); + bool write (::zmq_msg_t *msg_); void flush (); // Handlers for incoming commands. void process_plug (); void process_unplug (); + void process_attach (class zmq_engine_t *engine_); class zmq_engine_t *engine; + // The name of the session. One that is used to register it with + // socket-level repository of sessions. + std::string name; + session_t (const session_t&); void operator = (const session_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fa6c1e3..fb7bdcf 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -17,6 +17,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string> #include <algorithm> #include "../include/zmq.h" @@ -30,16 +31,20 @@ #include "session.hpp" #include "config.hpp" #include "owned.hpp" +#include "uuid.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), pending_term_acks (0), - app_thread (parent_) + app_thread (parent_), + shutting_down (false) { } zmq::socket_base_t::~socket_base_t () { + shutting_down = true; + while (true) { // On third pass of the loop there should be no more I/O objects @@ -64,10 +69,12 @@ zmq::socket_base_t::~socket_base_t () } // Check whether there are no session leaks. + sessions_sync.lock (); zmq_assert (sessions.empty ()); + sessions_sync.unlock (); } -int zmq::socket_base_t::setsockopt (int option_, void *optval_, +int zmq::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { switch (option_) { @@ -113,11 +120,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, return 0; case ZMQ_IDENTITY: - if (optvallen_ != sizeof (const char*)) { - errno = EINVAL; - return -1; - } - options.identity = (const char*) optval_; + options.identity.assign ((const char*) optval_, optvallen_); return 0; default: @@ -141,18 +144,34 @@ int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::connect (const char *addr_) { + // Generate a unique name for the session. + std::string session_name ("#"); + session_name += uuid_t ().to_string (); + + // Create the session. + io_thread_t *io_thread = choose_io_thread (options.affinity); + session_t *session = new session_t (io_thread, this, session_name.c_str ()); + zmq_assert (session); + send_plug (session); + send_own (this, session); + + // Create the connecter object. Supply it with the session name so that + // it can bind the new connection to the session once it is established. zmq_connecter_t *connecter = new zmq_connecter_t ( - choose_io_thread (options.affinity), this, options); + choose_io_thread (options.affinity), this, options, + session_name.c_str ()); int rc = connecter->set_address (addr_); - if (rc != 0) + if (rc != 0) { + delete connecter; return -1; - + } send_plug (connecter); send_own (this, connecter); + return 0; } -int zmq::socket_base_t::send (struct zmq_msg *msg_, int flags_) +int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { zmq_assert (false); } @@ -162,7 +181,7 @@ int zmq::socket_base_t::flush () zmq_assert (false); } -int zmq::socket_base_t::recv (struct zmq_msg *msg_, int flags_) +int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { zmq_assert (false); } @@ -174,35 +193,40 @@ int zmq::socket_base_t::close () return 0; } -void zmq::socket_base_t::register_session (const char *name_, +bool zmq::socket_base_t::register_session (const char *name_, session_t *session_) { sessions_sync.lock (); - bool inserted = sessions.insert (std::make_pair (name_, session_)).second; - zmq_assert (inserted); + bool registered = sessions.insert (std::make_pair (name_, session_)).second; sessions_sync.unlock (); + return registered; } -void zmq::socket_base_t::unregister_session (const char *name_) +bool zmq::socket_base_t::unregister_session (const char *name_) { sessions_sync.lock (); sessions_t::iterator it = sessions.find (name_); - zmq_assert (it != sessions.end ()); + bool unregistered = (it != sessions.end ()); sessions.erase (it); sessions_sync.unlock (); + return unregistered; } -zmq::session_t *zmq::socket_base_t::get_session (const char *name_) +zmq::session_t *zmq::socket_base_t::find_session (const char *name_) { sessions_sync.lock (); + sessions_t::iterator it = sessions.find (name_); - session_t *session = NULL; - if (it != sessions.end ()) { - session = it->second; - session->inc_seqnum (); - } + if (it == sessions.end ()) { + sessions_sync.unlock (); + return NULL; + } + + // Prepare the session for subsequent attach command. + it->second->inc_seqnum (); + sessions_sync.unlock (); - return session; + return it->second; } void zmq::socket_base_t::process_own (owned_t *object_) @@ -212,6 +236,11 @@ void zmq::socket_base_t::process_own (owned_t *object_) void zmq::socket_base_t::process_term_req (owned_t *object_) { + // When shutting down we can ignore termination requests from owned + // objects. They are going to be terminated anyway. + if (shutting_down) + return; + // If I/O object is well and alive ask it to terminate. io_objects_t::iterator it = std::find (io_objects.begin (), io_objects.end (), object_); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 8e99654..20ac4e2 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -40,23 +40,22 @@ namespace zmq ~socket_base_t (); // Interface for communication with the API layer. - virtual int setsockopt (int option_, void *optval_, size_t optvallen_); + virtual int setsockopt (int option_, const void *optval_, + size_t optvallen_); virtual int bind (const char *addr_); virtual int connect (const char *addr_); - virtual int send (struct zmq_msg *msg_, int flags_); + virtual int send (struct zmq_msg_t *msg_, int flags_); virtual int flush (); - virtual int recv (struct zmq_msg *msg_, int flags_); + virtual int recv (struct zmq_msg_t *msg_, int flags_); virtual int close (); - // Functions that owned objects use to manipulate socket's list - // of existing sessions. - // Note that this functionality cannot be implemented via inter-thread + // The list of sessions cannot be accessed via inter-thread // commands as it is unacceptable to wait for the completion of the // action till user application yields control of the application // thread to 0MQ. - void register_session (const char *name_, class session_t *session_); - void unregister_session (const char *name_); - class session_t *get_session (const char *name_); + bool register_session (const char *name_, class session_t *session_); + bool unregister_session (const char *name_); + class session_t *find_session (const char *name_); private: @@ -80,10 +79,17 @@ namespace zmq // Socket options. options_t options; + // If true, socket is already shutting down. No new work should be + // started. + bool shutting_down; + // List of existing sessions. This list is never referenced from within // the socket, instead it is used by I/O objects owned by the session. // As those objects can live in different threads, the access is // synchronised using 'sessions_sync' mutex. + // Local sessions are those named by the local instance of 0MQ. + // Remote sessions are the sessions who's identities are provided by + // the remote party. typedef std::map <std::string, session_t*> sessions_t; sessions_t sessions; mutex_t sessions_sync; diff --git a/src/zmq.cpp b/src/zmq.cpp index 7ebe003..49096ad 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -26,76 +26,79 @@ #include "socket_base.hpp" #include "err.hpp" #include "dispatcher.hpp" -#include "msg.hpp" +#include "msg_content.hpp" -int zmq_msg_init (zmq_msg *msg_) +int zmq_msg_init (zmq_msg_t *msg_) { - msg_->content = (zmq_msg_content*) ZMQ_VSM; + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; msg_->vsm_size = 0; return 0; } -int zmq_msg_init_size (zmq_msg *msg_, size_t size_) +int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) { if (size_ <= ZMQ_MAX_VSM_SIZE) { - msg_->content = (zmq_msg_content*) ZMQ_VSM; + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; msg_->vsm_size = (uint16_t) size_; } else { - msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content) + - size_); + msg_->content = + (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_); if (!msg_->content) { errno = ENOMEM; return -1; } msg_->shared = 0; - - msg_->content->data = (void*) (msg_->content + 1); - msg_->content->size = size_; - msg_->content->ffn = NULL; - new (&msg_->content->refcnt) zmq::atomic_counter_t (); + + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + content->data = (void*) (content + 1); + content->size = size_; + content->ffn = NULL; + new (&content->refcnt) zmq::atomic_counter_t (); } return 0; } -int zmq_msg_init_data (zmq_msg *msg_, void *data_, size_t size_, +int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_) { msg_->shared = 0; - msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content)); + msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); zmq_assert (msg_->content); - msg_->content->data = data_; - msg_->content->size = size_; - msg_->content->ffn = ffn_; - new (&msg_->content->refcnt) zmq::atomic_counter_t (); + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + content->data = data_; + content->size = size_; + content->ffn = ffn_; + new (&content->refcnt) zmq::atomic_counter_t (); return 0; } -int zmq_msg_close (zmq_msg *msg_) +int zmq_msg_close (zmq_msg_t *msg_) { - // For VSMs and delimiters there are no resources to free - if (msg_->content == (zmq_msg_content*) ZMQ_DELIMITER || - msg_->content == (zmq_msg_content*) ZMQ_VSM || - msg_->content == (zmq_msg_content*) ZMQ_GAP) + // For VSMs and delimiters there are no resources to free. + if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER || + msg_->content == (zmq::msg_content_t*) ZMQ_VSM || + msg_->content == (zmq::msg_content_t*) ZMQ_GAP) return 0; - // If the content is not shared, or if it is shared and the reference + // If the content is not shared, or if it is shared and the reference. // count has dropped to zero, deallocate it. - if (!msg_->shared || !msg_->content->refcnt.sub (1)) { + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + if (!msg_->shared || !content->refcnt.sub (1)) { - // We used "placement new" operator to initialize the reference + // We used "placement new" operator to initialize the reference. // counter so we call its destructor now. - msg_->content->refcnt.~atomic_counter_t (); + content->refcnt.~atomic_counter_t (); - if (msg_->content->ffn) - msg_->content->ffn (msg_->content->data); - free (msg_->content); + if (content->ffn) + content->ffn (content->data); + free (content); } return 0; } -int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_) +int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) { zmq_msg_close (dest_); *dest_ = *src_; @@ -103,23 +106,24 @@ int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_) return 0; } -int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_) +int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) { zmq_msg_close (dest_); // VSMs and delimiters require no special handling. if (src_->content != - (zmq_msg_content*) ZMQ_DELIMITER && - src_->content != (zmq_msg_content*) ZMQ_VSM && - src_->content != (zmq_msg_content*) ZMQ_GAP) { + (zmq::msg_content_t*) ZMQ_DELIMITER && + src_->content != (zmq::msg_content_t*) ZMQ_VSM && + src_->content != (zmq::msg_content_t*) ZMQ_GAP) { // One reference is added to shared messages. Non-shared messages // are turned into shared messages and reference count is set to 2. + zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; if (src_->shared) - src_->content->refcnt.add (1); + content->refcnt.add (1); else { src_->shared = true; - src_->content->refcnt.set (2); + content->refcnt.set (2); } } @@ -127,32 +131,34 @@ int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_) return 0; } -void *zmq_msg_data (zmq_msg *msg_) +void *zmq_msg_data (zmq_msg_t *msg_) { - if (msg_->content == (zmq_msg_content*) ZMQ_VSM) + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) return msg_->vsm_data; if (msg_->content == - (zmq_msg_content*) ZMQ_DELIMITER || - msg_->content == (zmq_msg_content*) ZMQ_GAP) + (zmq::msg_content_t*) ZMQ_DELIMITER || + msg_->content == (zmq::msg_content_t*) ZMQ_GAP) return NULL; - return msg_->content->data; + + return ((zmq::msg_content_t*) msg_->content)->data; } -size_t zmq_msg_size (zmq_msg *msg_) +size_t zmq_msg_size (zmq_msg_t *msg_) { - if (msg_->content == (zmq_msg_content*) ZMQ_VSM) + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) return msg_->vsm_size; if (msg_->content == - (zmq_msg_content*) ZMQ_DELIMITER || - msg_->content == (zmq_msg_content*) ZMQ_GAP) + (zmq::msg_content_t*) ZMQ_DELIMITER || + msg_->content == (zmq::msg_content_t*) ZMQ_GAP) return 0; - return msg_->content->size; + + return ((zmq::msg_content_t*) msg_->content)->size; } -int zmq_msg_type (zmq_msg *msg_) +int zmq_msg_type (zmq_msg_t *msg_) { // If it's a genuine message, return 0. - if (msg_->content >= (zmq_msg_content*) ZMQ_VSM) + if (msg_->content >= (zmq::msg_content_t*) ZMQ_VSM) return 0; // Trick the compiler to believe that content is an integer. @@ -192,7 +198,8 @@ int zmq_close (void *s_) return 0; } -int zmq_setsockopt (void *s_, int option_, void *optval_, size_t optvallen_) +int zmq_setsockopt (void *s_, int option_, const void *optval_, + size_t optvallen_) { return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_, optvallen_)); @@ -208,7 +215,7 @@ int zmq_connect (void *s_, const char *addr_) return (((zmq::socket_base_t*) s_)->connect (addr_)); } -int zmq_send (void *s_, zmq_msg *msg_, int flags_) +int zmq_send (void *s_, zmq_msg_t *msg_, int flags_) { return (((zmq::socket_base_t*) s_)->send (msg_, flags_)); } @@ -218,7 +225,7 @@ int zmq_flush (void *s_) return (((zmq::socket_base_t*) s_)->flush ()); } -int zmq_recv (void *s_, zmq_msg *msg_, int flags_) +int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) { return (((zmq::so |