diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-04-21 22:27:48 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-04-21 22:27:48 +0200 |
commit | e0246e32d79d71f8e73207b43aed8b23648e4fc7 (patch) | |
tree | 9952ee6fd39f4e27bbe932f6b6f30f0073009369 /src | |
parent | 581697695aac72894f2d3fefac904b9d50b3ba67 (diff) |
Message-related functionality factored out into msg_t class.
This patch addresses serveral issues:
1. It gathers message related functionality scattered over whole
codebase into a single class.
2. It makes zmq_msg_t an opaque datatype. Internals of the class
don't pollute zmq.h header file.
3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33%
of memory in scenarios with large amount of small messages.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
54 files changed, 605 insertions, 446 deletions
diff --git a/src/config.hpp b/src/config.hpp index 3df66c7..dff3f87 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -36,6 +36,10 @@ namespace zmq // memory allocation by approximately 99.6% message_pipe_granularity = 256, + // Size in bytes of the largest message that is still copied around + // rather than being reference-counted. + max_vsm_size = 29, + // Determines how often does socket poll for new commands when it // still has unprocessed messages to handle. Thus, if it is set to 100, // socket will process 100 inbound messages before doing the poll. diff --git a/src/ctx.cpp b/src/ctx.cpp index 2758729..fb5420d 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -26,8 +26,9 @@ #include "io_thread.hpp" #include "platform.hpp" #include "reaper.hpp" -#include "err.hpp" #include "pipe.hpp" +#include "err.hpp" +#include "msg.hpp" #if defined ZMQ_HAVE_WINDOWS #include "windows.h" @@ -304,10 +305,10 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) void zmq::ctx_t::log (const char *format_, va_list args_) { // Create the log message. - zmq_msg_t msg; - int rc = zmq_msg_init_size (&msg, strlen (format_) + 1); - zmq_assert (rc == 0); - memcpy (zmq_msg_data (&msg), format_, zmq_msg_size (&msg)); + msg_t msg; + int rc = msg.init_size (strlen (format_) + 1); + errno_assert (rc == 0); + memcpy (msg.data (), format_, msg.size ()); // At this point we migrate the log socket to the current thread. // We rely on mutex for executing the memory barrier. @@ -316,7 +317,8 @@ void zmq::ctx_t::log (const char *format_, va_list args_) log_socket->send (&msg, 0); log_sync.unlock (); - zmq_msg_close (&msg); + rc = msg.close (); + errno_assert (rc == 0); } diff --git a/src/ctx.hpp b/src/ctx.hpp index 33d5dad..7d865fa 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -26,8 +26,6 @@ #include <string> #include <stdarg.h> -#include "../include/zmq.h" - #include "mailbox.hpp" #include "semaphore.hpp" #include "ypipe.hpp" diff --git a/src/decoder.cpp b/src/decoder.cpp index efb39e8..bcf5974 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -31,7 +31,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : destination (NULL), maxmsgsize (maxmsgsize_) { - zmq_msg_init (&in_progress); + int rc = in_progress.init (); + errno_assert (rc == 0); // At the beginning, read one byte and go to one_byte_size_ready state. next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); @@ -39,7 +40,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : zmq::decoder_t::~decoder_t () { - zmq_msg_close (&in_progress); + int rc = in_progress.close (); + errno_assert (rc == 0); } void zmq::decoder_t::set_inout (i_inout *destination_) @@ -71,9 +73,9 @@ bool zmq::decoder_t::one_byte_size_ready () errno = ENOMEM; } else - rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); + rc = in_progress.init_size (*tmpbuf - 1); if (rc != 0 && errno == ENOMEM) { - rc = zmq_msg_init (&in_progress); + rc = in_progress.init (); errno_assert (rc == 0); decoding_error (); return false; @@ -106,9 +108,9 @@ bool zmq::decoder_t::eight_byte_size_ready () errno = ENOMEM; } else - rc = zmq_msg_init_size (&in_progress, size - 1); + rc = in_progress.init_size (size - 1); if (rc != 0 && errno == ENOMEM) { - rc = zmq_msg_init (&in_progress); + rc = in_progress.init (); errno_assert (rc == 0); decoding_error (); return false; @@ -122,9 +124,9 @@ bool zmq::decoder_t::eight_byte_size_ready () bool zmq::decoder_t::flags_ready () { // Store the flags from the wire into the message structure. - in_progress.flags = tmpbuf [0]; + in_progress.set_flags (tmpbuf [0]); - next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), + next_step (in_progress.data (), in_progress.size (), &decoder_t::message_ready); return true; diff --git a/src/decoder.hpp b/src/decoder.hpp index 23806a3..114ecef 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -27,10 +27,9 @@ #include <algorithm> #include "err.hpp" +#include "msg.hpp" #include "stdint.hpp" -#include "../include/zmq.h" - namespace zmq { @@ -196,7 +195,7 @@ namespace zmq struct i_inout *destination; unsigned char tmpbuf [8]; - ::zmq_msg_t in_progress; + msg_t in_progress; int64_t maxmsgsize; diff --git a/src/dist.cpp b/src/dist.cpp index 9d50368..093da79 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -18,8 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "dist.hpp" #include "pipe.hpp" #include "err.hpp" @@ -89,10 +87,10 @@ void zmq::dist_t::activated (writer_t *pipe_) active++; } -int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) +int zmq::dist_t::send (msg_t *msg_, int flags_) { // Is this end of a multipart message? - bool msg_more = msg_->flags & ZMQ_MSG_MORE; + bool msg_more = msg_->flags () & msg_t::more; // Push the message to active pipes. distribute (msg_, flags_); @@ -106,63 +104,33 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) return 0; } -void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_) +void zmq::dist_t::distribute (msg_t *msg_, int flags_) { // If there are no active pipes available, simply drop the message. if (active == 0) { - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return; - } - - msg_content_t *content = (msg_content_t*) msg_->content; - - // For VSMs the copying is straighforward. - if (content == (msg_content_t*) ZMQ_VSM) { - for (pipes_t::size_type i = 0; i < active;) - if (write (pipes [i], msg_)) - i++; - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return; - } - - // Optimisation for the case when there's only a single pipe - // to send the message to - no refcount adjustment i.e. no atomic - // operations are needed. - if (active == 1) { - if (!write (pipes [0], msg_)) { - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - } - int rc = zmq_msg_init (msg_); + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); zmq_assert (rc == 0); return; } - // There are at least 2 destinations for the message. That means we have - // to deal with reference counting. First add N-1 references to - // the content (we are holding one reference anyway, that's why -1). - if (msg_->flags & ZMQ_MSG_SHARED) - content->refcnt.add (active - 1); - else { - content->refcnt.set (active); - msg_->flags |= ZMQ_MSG_SHARED; - } + // Add active-1 references to the message. We already hold one reference, + // that's why -1. + msg_->add_refs (active - 1); - // Push the message to all destinations. + // Push copy of the message to each active pipe. for (pipes_t::size_type i = 0; i < active;) { if (!write (pipes [i], msg_)) - content->refcnt.sub (1); + msg_->rm_refs (1); else i++; } - // Detach the original message from the data buffer. - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); + // Detach the original message from the data buffer. Note that we don't + // close the message. That's because we've already used all the references. + int rc = msg_->init (); + errno_assert (rc == 0); } bool zmq::dist_t::has_out () @@ -170,14 +138,14 @@ bool zmq::dist_t::has_out () return true; } -bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) +bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { active--; pipes.swap (pipes.index (pipe_), active); return false; } - if (!(msg_->flags & ZMQ_MSG_MORE)) + if (!(msg_->flags () & msg_t::more)) pipe_->flush (); return true; } diff --git a/src/dist.hpp b/src/dist.hpp index ad9767a..ea05305 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -40,7 +40,7 @@ namespace zmq void attach (writer_t *pipe_); void terminate (); - int send (zmq_msg_t *msg_, int flags_); + int send (class msg_t *msg_, int flags_); bool has_out (); // i_writer_events interface implementation. @@ -51,10 +51,10 @@ namespace zmq // Write the message to the pipe. Make the pipe inactive if writing // fails. In such a case false is returned. - bool write (class writer_t *pipe_, zmq_msg_t *msg_); + bool write (class writer_t *pipe_, class msg_t *msg_); // Put the message to all active pipes. - void distribute (zmq_msg_t *msg_, int flags_); + void distribute (class msg_t *msg_, int flags_); // Plug in all the delayed pipes. void clear_new_pipes (); diff --git a/src/encoder.cpp b/src/encoder.cpp index 88e1dff..a42f06f 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -26,7 +26,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) : encoder_base_t <encoder_t> (bufsize_), source (NULL) { - zmq_msg_init (&in_progress); + int rc = in_progress.init (); + errno_assert (rc == 0); // Write 0 bytes to the batch and go to message_ready state. next_step (NULL, 0, &encoder_t::message_ready, true); @@ -34,7 +35,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) : zmq::encoder_t::~encoder_t () { - zmq_msg_close (&in_progress); + int rc = in_progress.close (); + errno_assert (rc == 0); } void zmq::encoder_t::set_inout (i_inout *source_) @@ -45,7 +47,7 @@ void zmq::encoder_t::set_inout (i_inout *source_) bool zmq::encoder_t::size_ready () { // Write message body into the buffer. - next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), + next_step (in_progress.data (), in_progress.size (), &encoder_t::message_ready, false); return true; } @@ -53,19 +55,21 @@ bool zmq::encoder_t::size_ready () bool zmq::encoder_t::message_ready () { // Destroy content of the old message. - zmq_msg_close (&in_progress); + int rc = in_progress.close (); + errno_assert (rc == 0); // Read new message. If there is none, return false. // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. if (!source || !source->read (&in_progress)) { - zmq_msg_init (&in_progress); + rc = in_progress.init (); + errno_assert (rc == 0); return false; } // Get the message size. - size_t size = zmq_msg_size (&in_progress); + size_t size = in_progress.size (); // Account for the 'flags' byte. size++; @@ -75,16 +79,16 @@ bool zmq::encoder_t::message_ready () // message size. In both cases 'flags' field follows. if (size < 255) { tmpbuf [0] = (unsigned char) size; - tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED); + tmpbuf [1] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 2, &encoder_t::size_ready, - !(in_progress.flags & ZMQ_MSG_MORE)); + !(in_progress.flags () & msg_t::more)); } else { tmpbuf [0] = 0xff; put_uint64 (tmpbuf + 1, size); - tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED); + tmpbuf [9] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 10, &encoder_t::size_ready, - !(in_progress.flags & ZMQ_MSG_MORE)); + !(in_progress.flags () & msg_t::more)); } return true; } diff --git a/src/encoder.hpp b/src/encoder.hpp index 918ec2b..617b65b 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -27,8 +27,7 @@ #include <algorithm> #include "err.hpp" - -#include "../include/zmq.h" +#include "msg.hpp" namespace zmq { @@ -172,7 +171,7 @@ namespace zmq bool message_ready (); struct i_inout *source; - ::zmq_msg_t in_progress; + msg_t in_progress; unsigned char tmpbuf [10]; encoder_t (const encoder_t&); diff --git a/src/err.cpp b/src/err.cpp index 8761c22..87a0006 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -18,8 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "err.hpp" #include "platform.hpp" diff --git a/src/err.hpp b/src/err.hpp index 3ffd99d..6289a08 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -21,6 +21,9 @@ #ifndef __ZMQ_ERR_HPP_INCLUDED__ #define __ZMQ_ERR_HPP_INCLUDED__ +// 0MQ-specific error codes are defined in zmq.h +#include "../include/zmq.h" + #include <assert.h> #include <errno.h> #include <string.h> @@ -18,12 +18,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "fq.hpp" #include "pipe.hpp" #include "err.hpp" #include "own.hpp" +#include "msg.hpp" zmq::fq_t::fq_t (own_t *sink_) : active (0), @@ -95,10 +94,11 @@ void zmq::fq_t::activated (reader_t *pipe_) active++; } -int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) +int zmq::fq_t::recv (msg_t *msg_, int flags_) { // Deallocate old content of the message. - zmq_msg_close (msg_); + int rc = msg_->close (); + errno_assert (rc == 0); // Round-robin over the pipes to get the next message. for (int count = active; count != 0; count--) { @@ -116,7 +116,7 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) // and replaced by another active pipe. Thus we don't have to increase // the 'current' pointer. if (fetched) { - more = msg_->flags & ZMQ_MSG_MORE; + more = msg_->flags () & msg_t::more; if (!more) { current++; if (current >= active) @@ -134,7 +134,8 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) // No message is available. Initialise the output parameter // to be a 0-byte message. - zmq_msg_init (msg_); + rc = msg_->init (); + errno_assert (rc == 0); errno = EAGAIN; return -1; } @@ -23,6 +23,7 @@ #include "array.hpp" #include "pipe.hpp" +#include "msg.hpp" namespace zmq { @@ -40,7 +41,7 @@ namespace zmq void attach (reader_t *pipe_); void terminate (); - int recv (zmq_msg_t *msg_, int flags_); + int recv (msg_t *msg_, int flags_); bool has_in (); // i_reader_events implementation. diff --git a/src/i_inout.hpp b/src/i_inout.hpp index 057b46c..3f8e8e0 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -21,8 +21,7 @@ #ifndef __ZMQ_I_INOUT_HPP_INCLUDED__ #define __ZMQ_I_INOUT_HPP_INCLUDED__ -#include "../include/zmq.h" - +#include "msg.hpp" #include "stdint.hpp" namespace zmq @@ -33,10 +32,10 @@ namespace zmq virtual ~i_inout () {} // Engine asks for a message to send to the network. - virtual bool read (::zmq_msg_t *msg_) = 0; + virtual bool read (msg_t *msg_) = 0; // Engine received message from the network and sends it further on. - virtual bool write (::zmq_msg_t *msg_) = 0; + virtual bool write (msg_t *msg_) = 0; // Flush all the previously written messages. virtual void flush () = 0; diff --git a/src/io_thread.cpp b/src/io_thread.cpp index be52bdd..9678392 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -20,8 +20,6 @@ #include <new> -#include "../include/zmq.h" - #include "io_thread.hpp" #include "platform.hpp" #include "err.hpp" @@ -23,11 +23,9 @@ #include <stdlib.h> #include <string> -#include "../include/zmq.h" - #include "ip.hpp" -#include "platform.hpp" #include "err.hpp" +#include "platform.hpp" #include "stdint.hpp" #if defined ZMQ_HAVE_SOLARIS @@ -18,12 +18,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "lb.hpp" #include "pipe.hpp" #include "err.hpp" #include "own.hpp" +#include "msg.hpp" zmq::lb_t::lb_t (own_t *sink_) : active (0), @@ -93,26 +92,26 @@ void zmq::lb_t::activated (writer_t *pipe_) active++; } -int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) +int zmq::lb_t::send (msg_t *msg_, int flags_) { // Drop the message if required. If we are at the end of the message // switch back to non-dropping mode. if (dropping) { - more = msg_->flags & ZMQ_MSG_MORE; + more = msg_->flags () & msg_t::more; if (!more) dropping = false; - int rc = zmq_msg_close (msg_); + int rc = msg_->close (); errno_assert (rc == 0); - rc = zmq_msg_init (msg_); + rc = msg_->init (); zmq_assert (rc == 0); return 0; } while (active > 0) { if (pipes [current]->write (msg_)) { - more = msg_->flags & ZMQ_MSG_MORE; + more = msg_->flags () & msg_t::more; break; } @@ -138,8 +137,8 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) } // Detach the message from the data buffer. - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); + int rc = msg_->init (); + errno_assert (rc == 0); return 0; } @@ -154,13 +153,16 @@ bool zmq::lb_t::has_out () while (active > 0) { // Check whether zero-sized message can be written to the pipe. - zmq_msg_t msg; - zmq_msg_init (&msg); + msg_t msg; + int rc = msg.init (); + errno_assert (rc == 0); if (pipes [current]->check_write (&msg)) { - zmq_msg_close (&msg); + rc = msg.close (); + errno_assert (rc == 0); return true; } - zmq_msg_close (&msg); + rc = msg.close (); + errno_assert (rc == 0); // Deactivate the pipe. active--; @@ -38,7 +38,7 @@ namespace zmq void attach (writer_t *pipe_); void terminate (); - int send (zmq_msg_t *msg_, int flags_); + int send (msg_t *msg_, int flags_); bool has_out (); // i_writer_events interface implementation. diff --git a/src/msg.cpp b/src/msg.cpp index e800bd6..bd6f066 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -29,155 +29,234 @@ #include "likely.hpp" #include "err.hpp" -int zmq_msg_init (zmq_msg_t *msg_) +bool zmq::msg_t::check () { - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; - msg_->vsm_size = 0; + return u.base.type >= type_min && u.base.type <= type_max; +} + +int zmq::msg_t::init () +{ + u.vsm.type = type_vsm; + u.vsm.flags = 0; + u.vsm.size = 0; return 0; } -int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) +int zmq::msg_t::init_size (size_t size_) { - if (size_ <= ZMQ_MAX_VSM_SIZE) { - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; - msg_->vsm_size = (uint8_t) size_; + if (size_ <= max_vsm_size) { + u.vsm.type = type_vsm; + u.vsm.flags = 0; + u.vsm.size = (unsigned char) size_; } else { - msg_->content = - (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_); - if (!msg_->content) { + u.lmsg.type = type_lmsg; + u.lmsg.flags = 0; + u.lmsg.content = + (content_t*) malloc (sizeof (content_t) + size_); + if (!u.lmsg.content) { errno = ENOMEM; return -1; } - msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; - - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - content->data = (void*) (content + 1); - content->size = size_; - content->ffn = NULL; - content->hint = NULL; - new (&content->refcnt) zmq::atomic_counter_t (); + + u.lmsg.content->data = u.lmsg.content + 1; + u.lmsg.content->size = size_; + u.lmsg.content->ffn = NULL; + u.lmsg.content->hint = NULL; + new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); } return 0; } -int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, - zmq_free_fn *ffn_, void *hint_) +int zmq::msg_t::init_data (void *data_, size_t size_, zmq_free_fn *ffn_, + void *hint_) { - msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); - alloc_assert (msg_->content); - msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - content->data = data_; - content->size = size_; - content->ffn = ffn_; - content->hint = hint_; - new (&content->refcnt) zmq::atomic_counter_t (); + u.lmsg.type = type_lmsg; + u.lmsg.flags = 0; + u.lmsg.content = (content_t*) malloc (sizeof (content_t)); + alloc_assert (u.lmsg.content); + + u.lmsg.content->data = data_; + u.lmsg.content->size = size_; + u.lmsg.content->ffn = ffn_; + u.lmsg.content->hint = hint_; + new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); return 0; + } -int zmq_msg_close (zmq_msg_t *msg_) +int zmq::msg_t::init_delimiter () { - // Check the validity tag. - if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) { + u.delimiter.type = type_delimiter; + u.delimiter.flags = 0; + return 0; +} + +int zmq::msg_t::close () +{ + // Check the validity of the message. + if (unlikely (!check ())) { errno = EFAULT; return -1; } - // For VSMs and delimiters there are no resources to free. - if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && - msg_->content != (zmq::msg_content_t*) ZMQ_VSM) { + if (u.base.type == type_lmsg) { - // If the content is not shared, or if it is shared and the reference. + // If the content is not shared, or if it is shared and the reference // count has dropped to zero, deallocate it. - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) { + if (!(u.lmsg.flags & msg_t::shared) || + !u.lmsg.content->refcnt.sub (1)) { - // We used "placement new" operator to initialize the reference. - // counter so we call its destructor now. - content->refcnt.~atomic_counter_t (); + // We used "placement new" operator to initialize the reference + // counter so we call the destructor explicitly now. + u.lmsg.content->refcnt.~atomic_counter_t (); - if (content->ffn) - content->ffn (content->data, content->hint); - free (content); + if (u.lmsg.content->ffn) + u.lmsg.content->ffn (u.lmsg.content->data, + u.lmsg.content->hint); + free (u.lmsg.content); } } - // Remove the validity tag from the message. - msg_->flags = 0; + // Make the message invalid. + u.base.type = 0; return 0; + } -int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) +int zmq::msg_t::move (msg_t &src_) { - // Check the validity tags. - if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff || - (src_->flags | ZMQ_MSG_MASK) != 0xff)) { + // Check the validity of the source. + if (unlikely (!src_.check ())) { errno = EFAULT; return -1; } - zmq_msg_close (dest_); - *dest_ = *src_; - zmq_msg_init (src_); + int rc = close (); + if (unlikely (rc < 0)) + return rc; + + *this = src_; + + rc = src_.init (); + if (unlikely (rc < 0)) + return rc; + return 0; } -int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) +int zmq::msg_t::copy (msg_t &src_) { - // Check the validity tags. - if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff || - (src_->flags | ZMQ_MSG_MASK) != 0xff)) { + // Check the validity of the source. + if (unlikely (!src_.check ())) { errno = EFAULT; return -1; } - zmq_msg_close (dest_); + int rc = close (); + if (unlikely (rc < 0)) + return rc; - // VSMs and delimiters require no special handling. - if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && - src_->content != (zmq::msg_content_t*) ZMQ_VSM) { + if (src_.u.base.type == type_lmsg) { // One reference is added to shared messages. Non-shared messages // are turned into shared messages and reference count is set to 2. - zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; - if (src_->flags & ZMQ_MSG_SHARED) - content->refcnt.add (1); + if (src_.u.lmsg.flags & msg_t::shared) + src_.u.lmsg.content->refcnt.add (1); else { - src_->flags |= ZMQ_MSG_SHARED; - content->refcnt.set (2); + src_.u.lmsg.flags |= msg_t::shared; + src_.u.lmsg.content->refcnt.set (2); } } - *dest_ = *src_; + *this = src_; + return 0; + +} + +void *zmq::msg_t::data () +{ + // Check the validity of the message. + zmq_assert (check ()); + + switch (u.base.type) { + case type_vsm: + return u.vsm.data; + case type_lmsg: + return u.lmsg.content->data; + default: + zmq_assert (false); + } } -void *zmq_msg_data (zmq_msg_t *msg_) +size_t zmq::msg_t::size () { - // Check the validity tag. - zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff); + // Check the validity of the message. + zmq_assert (check ()); - if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) - return msg_->vsm_data; - if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) - return NULL; + switch (u.base.type) { + case type_vsm: + return u.vsm.size; + case type_lmsg: + return u.lmsg.content->size; + default: + zmq_assert (false); + } +} - return ((zmq::msg_content_t*) msg_->content)->data; +unsigned char zmq::msg_t::flags () +{ + return u.base.flags; } -size_t zmq_msg_size (zmq_msg_t *msg_) +void zmq::msg_t::set_flags (unsigned char flags_) { - // Check the validity tag. - zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff); + u.base.flags |= flags_; +} - if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) - return msg_->vsm_size; - if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) - return 0; +void zmq::msg_t::reset_flags (unsigned char flags_) +{ + u.base.flags &= ~flags_; +} + +bool zmq::msg_t::is_delimiter () +{ + return u.base.type == type_delimiter; +} + +void zmq::msg_t::add_refs (int refs_) +{ + zmq_assert (refs_ >= 0); - return ((zmq::msg_content_t*) msg_->content)->size; + // No copies required. + if (!refs_) + return; + + // VSMs and delimiters can be copied straight away. The only message type + // that needs special care are long messages. + if (u.base.type == type_lmsg) { + if (u.lmsg.flags & msg_t::shared) + u.lmsg.content->refcnt.add (refs_); + else { + u.lmsg.content->refcnt.set (refs_ + 1); + u.lmsg.flags |= msg_t::shared; + } + } +} + +void zmq::msg_t::rm_refs (int refs_) +{ + zmq_assert (refs_ >= 0); + + // No copies required. + if (!refs_) + return; + + // The only message type that needs special care are long messages. + if (u.base.type == type_lmsg) { + zmq_assert (u.lmsg.flags & msg_t::shared); + u.lmsg.content->refcnt.sub (refs_); + } } diff --git a/src/msg.hpp b/src/msg.hpp index 7e22098..b7d21f6 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -23,28 +23,105 @@ #include <stddef.h> -#include "../include/zmq.h" - +#include "config.hpp" #include "atomic_counter.hpp" namespace zmq { - // Shared message buffer. Message data are either allocated in one - // continuous block along with this structure - thus avoiding one - // malloc/free pair or they are stored in used-supplied memory. - // In the latter case, ffn member stores pointer to the function to be - // used to deallocate the data. If the buffer is actually shared (there - // are at least 2 references to it) refcount member contains number of - // references. + // Note that this structure needs to be explicitly constructed + // (init functions) and destructed (close function). - struct msg_content_t + class msg_t { - void *data; - size_t size; - zmq_free_fn *ffn; - void *hint; - zmq::atomic_counter_t refcnt; + public: + + // Mesage flags. + enum + { + more = 1, + shared = 128 + }; + + // Signature for free function to deallocate the message content. + typedef void (free_fn_t) (void *data, void *hint); + + bool check (); + int init (); + int init_size (size_t size_); + int init_data (void *data_, size_t size_, free_fn_t *ffn_, + void *hint_); + int init_delimiter (); + int close (); + int move (msg_t &src_); + int copy (msg_t &src_); + void *data (); + size_t size (); + unsigned char flags (); + void set_flags (unsigned char flags_); + void reset_flags (unsigned char flags_); + bool is_delimiter (); + + // After calling this function you can copy the message in POD-style + // refs_ times. No need to call copy. + void add_refs (int refs_); + + // Removes references previously added by add_refs. + void rm_refs (int refs_); + + private: + + // Shared message buffer. Message data are either allocated in one + // continuous block along with this structure - thus avoiding one + // malloc/free pair or they are stored in used-supplied memory. + // In the latter case, ffn member stores pointer to the function to be + // used to deallocate the data. If the buffer is actually shared (there + // are at least 2 references to it) refcount member contains number of + // references. + struct content_t + { + void *data; + size_t size; + free_fn_t *ffn; + void *hint; + zmq::atomic_counter_t refcnt; + }; + + // Different message types. + enum type_t + { + type_min = 101, + type_vsm = 101, + type_lmsg = 102, + type_delimiter = 103, + type_max = 103 + }; + + // Note that fields shared between different message types are not + // moved to tha parent class (msg_t). This way we ger tighter packing + // of the data. Shared fields can be accessed via 'base' member of + // the union. + union { + struct { + unsigned char type; + unsigned char flags; + } base; + struct { + unsigned char type; + unsigned char flags; + unsigned char size; + unsigned char data [max_vsm_size]; + } vsm; + struct { + unsigned char type; + unsigned char flags; + content_t *content; + } lmsg; + struct { + unsigned char type; + unsigned char flags; + } delimiter; + } u; }; } diff --git a/src/object.hpp b/src/object.hpp index 706303b..0f5e61b 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -21,8 +21,6 @@ #ifndef __ZMQ_OBJECT_HPP_INCLUDED__ #define __ZMQ_OBJECT_HPP_INCLUDED__ -#include "../include/zmq.h" - #include "stdint.hpp" #include "blob.hpp" diff --git a/src/options.cpp b/src/options.cpp index 556ffd8..399fd27 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -20,8 +20,6 @@ #include <string.h> -#include "../include/zmq.h" - #include "options.hpp" #include "err.hpp" diff --git a/src/pair.cpp b/src/pair.cpp index 1acc60f..d877b54 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -18,11 +18,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "pair.hpp" #include "err.hpp" #include "pipe.hpp" +#include "msg.hpp" zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), @@ -116,7 +115,7 @@ void zmq::pair_t::activated (class writer_t *pipe_) outpipe_alive = true; } -int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_) +int zmq::pair_t::xsend (msg_t *msg_, int flags_) { if (outpipe == NULL || !outpipe_alive) { errno = EAGAIN; @@ -133,16 +132,17 @@ int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_) outpipe->flush (); // Detach the original message from the data buffer. - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); + int rc = msg_->init (); + errno_assert (rc == 0); return 0; } -int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::pair_t::xrecv (msg_t *msg_, int flags_) { // Deallocate old content of the message. - zmq_msg_close (msg_); + int rc = msg_->close (); + errno_assert (rc == 0); if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) { @@ -150,7 +150,8 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_) inpipe_alive = false; // Initialise the output parameter to be a 0-byte message. - zmq_msg_init (msg_); + rc = msg_->init (); + errno_assert (rc == 0); errno = EAGAIN; return -1; } @@ -171,10 +172,12 @@ bool zmq::pair_t::xhas_out () if (!outpipe || !outpipe_alive) return false; - zmq_msg_t msg; - zmq_msg_init (&msg); + msg_t msg; + int rc = msg.init (); + errno_assert (rc == 0); outpipe_alive = outpipe->check_write (&msg); - zmq_msg_close (&msg); + rc = msg.close (); + errno_assert (rc == 0); return outpipe_alive; } diff --git a/src/pair.hpp b/src/pair.hpp index 54e60b5..a10e15a 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -40,8 +40,8 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); + int xsend (class msg_t *msg_, int flags_); + int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/pipe.cpp b/src/pipe.cpp index 2af2dc2..36dc808 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -20,8 +20,6 @@ #include <new> -#include "../include/zmq.h" - #include "pipe.hpp" #include "likely.hpp" @@ -53,11 +51,12 @@ zmq::reader_t::~reader_t () zmq_assert (pipe); // First delete all the unread messages in the pipe. We have to do it by - // hand because zmq_msg_t is a POD, not a class, so there's no associated - // destructor. - zmq_msg_t msg; - while (pipe->read (&msg)) - zmq_msg_close (&msg); + // hand because msg_t doesn't have automatic destructor. + msg_t msg; + while (pipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } delete pipe; } @@ -68,11 +67,9 @@ void zmq::reader_t::set_event_sink (i_reader_events *sink_) sink = sink_; } -bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) +bool zmq::reader_t::is_delimiter (msg_t &msg_) { - unsigned char *offset = 0; - - return msg_.content == (void*) (offset + ZMQ_DELIMITER); + return msg_.is_delimiter (); } bool zmq::reader_t::check_read () @@ -89,7 +86,7 @@ bool zmq::reader_t::check_read () // If the next item in the pipe is message delimiter, // initiate its termination. if (pipe->probe (is_delimiter)) { - zmq_msg_t msg; + msg_t msg; bool ok = pipe->read (&msg); zmq_assert (ok); if (sink) @@ -101,7 +98,7 @@ bool zmq::reader_t::check_read () return true; } -bool zmq::reader_t::read (zmq_msg_t *msg_) +bool zmq::reader_t::read (msg_t *msg_) { if (!active) return false; @@ -112,15 +109,14 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) } // If delimiter was read, start termination process of the pipe. - unsigned char *offset = 0; - if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { + if (msg_->is_delimiter ()) { if (sink) sink->delimited (this); terminate (); return false; } - if (!(msg_->flags & ZMQ_MSG_MORE)) + if (!(msg_->flags () & msg_t::more)) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) @@ -187,7 +183,7 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_) sink = sink_; } -bool zmq::writer_t::check_write (zmq_msg_t *msg_) +bool zmq::writer_t::check_write (msg_t *msg_) { // We've already checked and there's no space free for the new message. // There's no point in checking once again. @@ -202,13 +198,13 @@ bool zmq::writer_t::check_write (zmq_msg_t *msg_) return true; } -bool zmq::writer_t::write (zmq_msg_t *msg_) +bool zmq::writer_t::write (msg_t *msg_) { if (unlikely (!check_write (msg_))) return false; - pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); - if (!(msg_->flags & ZMQ_MSG_MORE)) + pipe->write (*msg_, msg_->flags () & msg_t::more); + if (!(msg_->flags () & msg_t::more)) msgs_written++; return true; @@ -217,10 +213,11 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) void zmq::writer_t::rollback () { // Remove incomplete message from the pipe. - zmq_msg_t msg; + msg_t msg; while (pipe->unwrite (&msg)) { - zmq_assert (msg.flags & ZMQ_MSG_MORE); - zmq_msg_close (&msg); + zmq_assert (msg.flags () & msg_t::more); + int rc = msg.close (); + errno_assert (rc == 0); } } @@ -246,10 +243,8 @@ void zmq::writer_t::terminate () // Push delimiter into the pipe. Trick the compiler to belive that // the tag is a valid pointer. Note that watermarks are not checked // thus the delimiter can be written even though the pipe is full. - zmq_msg_t msg; - const unsigned char *offset = 0; - msg.content = (void*) (offset + ZMQ_DELIMITER); - msg.flags = 0; + msg_t msg; + msg.init_delimiter (); pipe->write (msg, false); flush (); } diff --git a/src/pipe.hpp b/src/pipe.hpp index 3230d02..75b5c47 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -21,8 +21,7 @@ #ifndef __ZMQ_PIPE_HPP_INCLUDED__ #define __ZMQ_PIPE_HPP_INCLUDED__ -#include "../include/zmq.h" - +#include "msg.hpp" #include "array.hpp" #include "ypipe.hpp" #include "config.hpp" @@ -43,7 +42,7 @@ namespace zmq // event. When endpoint processes the event and returns, associated // reader/writer object is deallocated. - typedef ypipe_t <zmq_msg_t, message_pipe_granularity> pipe_t; + typedef ypipe_t <msg_t, message_pipe_granularity> pipe_t; struct i_reader_events { @@ -69,7 +68,7 @@ namespace zmq bool check_read (); // Reads a message to the underlying pipe. - bool read (zmq_msg_t *msg_); + bool read (msg_t *msg_); // Ask pipe to terminate. void terminate (); @@ -87,7 +86,7 @@ namespace zmq void process_pipe_term_ack (); // Returns true if the message is delimiter; false otherwise. - static bool is_delimiter (zmq_msg_t &msg_); + static bool is_delimiter (msg_t &msg_); // True, if pipe can be read from. bool active; @@ -136,11 +135,11 @@ namespace zmq // Checks whether messages can be written to the pipe. // If writing the message would cause high watermark // the function returns false. - bool check_write (zmq_msg_t *msg_); + bool check_write (msg_t *msg_); // Writes a message to the underlying pipe. Returns false if the // message cannot be written because high watermark was reached. - bool write (zmq_msg_t *msg_); + bool write (msg_t *msg_); // Remove unfinished part of a message from the pipe. void rollback (); diff --git a/src/pub.cpp b/src/pub.cpp index 74f07fc..8558265 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -19,6 +19,7 @@ */ #include "pub.hpp" +#include "msg.hpp" zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) : xpub_t (parent_, tid_) diff --git a/src/pull.cpp b/src/pull.cpp index a8c2466..b9d4433 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -18,10 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "pull.hpp" #include "err.hpp" +#include "msg.hpp" zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), @@ -49,7 +48,7 @@ void zmq::pull_t::process_term (int linger_) socket_base_t::process_term (linger_); } -int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::pull_t::xrecv (msg_t *msg_, int flags_) { return fq.recv (msg_, flags_); } diff --git a/src/pull.hpp b/src/pull.hpp index 95084ba..ffc3fdb 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -39,7 +39,7 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - int xrecv (zmq_msg_t *msg_, int flags_); + int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); private: diff --git a/src/push.cpp b/src/push.cpp index 072994f..d6ee399 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -18,11 +18,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "push.hpp" -#include "err.hpp" #include "pipe.hpp" +#include "err.hpp" +#include "msg.hpp" zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), @@ -50,7 +49,7 @@ void zmq::push_t::process_term (int linger_) socket_base_t::process_term (linger_); } -int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) +int zmq::push_t::xsend (msg_t *msg_, int flags_) { return lb.send (msg_, flags_); } diff --git a/src/push.hpp b/src/push.hpp index f04b25f..c4d63f6 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -39,7 +39,7 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - int xsend (zmq_msg_t *msg_, int flags_); + int xsend (class msg_t *msg_, int flags_); bool xhas_out (); private: diff --git a/src/rep.cpp b/src/rep.cpp index 46c35cb..ef0defc 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -18,10 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "rep.hpp" #include "err.hpp" +#include "msg.hpp" zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_) : xrep_t (parent_, tid_), @@ -35,7 +34,7 @@ zmq::rep_t::~rep_t () { } -int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) +int zmq::rep_t::xsend (msg_t *msg_, int flags_) { // If we are in the middle of receiving a request, we cannot send reply. if (!sending_reply) { @@ -43,7 +42,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) return -1; } - bool more = (msg_->flags & ZMQ_MSG_MORE); + bool more = (msg_->flags () & msg_t::more); // Push message to the reply pipe. int rc = xrep_t::xsend (msg_, flags_); @@ -57,7 +56,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) return 0; } -int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::rep_t::xrecv (msg_t *msg_, int flags_) { // If we are in middle of sending a reply, we cannot receive next request. if (sending_reply) { @@ -78,10 +77,10 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) int rc = xrep_t::xrecv (msg_, flags_); if (rc != 0) return rc; - zmq_assert (msg_->flags & ZMQ_MSG_MORE); + zmq_assert (msg_->flags () & msg_t::more); // Empty message part delimits the traceback stack. - bottom = (zmq_msg_size (msg_) == 0); + bottom = (msg_->size () == 0); // Push it to the reply pipe. rc = xrep_t::xsend (msg_, flags_); @@ -98,7 +97,7 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) return rc; // If whole request is read, flip the FSM to reply-sending state. - if (!(msg_->flags & ZMQ_MSG_MORE)) { + if (!(msg_->flags () & msg_t::more)) { sending_reply = true; request_begins = true; } diff --git a/src/rep.hpp b/src/rep.hpp index d0dd9c8..a13853d 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -34,8 +34,8 @@ namespace zmq ~rep_t (); // Overloads of functions from socket_base_t. - int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); + int xsend (class msg_t *msg_, int flags_); + int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/req.cpp b/src/req.cpp index 503f221..6bf502f 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -18,10 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "req.hpp" #include "err.hpp" +#include "msg.hpp" zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) : xreq_t (parent_, tid_), @@ -35,7 +34,7 @@ zmq::req_t::~req_t () { } -int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) +int zmq::req_t::xsend (msg_t *msg_, int flags_) { // If we've sent a request and we still haven't got the reply, // we can't send another request. @@ -46,17 +45,17 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) // First part of the request is empty message part (stack bottom). if (message_begins) { - zmq_msg_t prefix; - int rc = zmq_msg_init (&prefix); - zmq_assert (rc == 0); - prefix.flags |= ZMQ_MSG_MORE; + msg_t prefix; + int rc = prefix.init (); + errno_assert (rc == 0); + prefix.set_flags (msg_t::more); rc = xreq_t::xsend (&prefix, flags_); if (rc != 0) return rc; message_begins = false; } - bool more = msg_->flags & ZMQ_MSG_MORE; + bool more = msg_->flags () & msg_t::more; int rc = xreq_t::xsend (msg_, flags_); if (rc != 0) @@ -71,7 +70,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) return 0; } -int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::req_t::xrecv (msg_t *msg_, int flags_) { // If request wasn't send, we can't wait for reply. if (!receiving_reply) { @@ -84,8 +83,8 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) int rc = xreq_t::xrecv (msg_, flags_); if (rc != 0) return rc; - zmq_assert (msg_->flags & ZMQ_MSG_MORE); - zmq_assert (zmq_msg_size (msg_) == 0); + zmq_assert (msg_->flags () & msg_t::more); + zmq_assert (msg_->size () == 0); message_begins = false; } @@ -94,7 +93,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) return rc; // If the reply is fully received, flip the FSM into request-sending state. - if (!(msg_->flags & ZMQ_MSG_MORE)) { + if (!(msg_->flags () & msg_t::more)) { receiving_reply = false; message_begins = true; } diff --git a/src/req.hpp b/src/req.hpp index 3138498..e0554ac 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -34,8 +34,8 @@ namespace zmq ~req_t (); // Overloads of functions from socket_base_t. - int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); + int xsend (class msg_t *msg_, int flags_); + int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/session.cpp b/src/session.cpp index 5f970cc..499fe40 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -80,7 +80,7 @@ void zmq::session_t::proceed_with_term () own_t::process_term (0); } -bool zmq::session_t::read (::zmq_msg_t *msg_) +bool zmq::session_t::read (msg_t *msg_) { if (!in_pipe) return false; @@ -88,14 +88,15 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) if (!in_pipe->read (msg_)) return false; - incomplete_in = msg_->flags & ZMQ_MSG_MORE; + incomplete_in = msg_->flags () & msg_t::more; return true; } -bool zmq::session_t::write (::zmq_msg_t *msg_) +bool zmq::session_t::write (msg_t *msg_) { if (out_pipe && out_pipe->write (msg_)) { - zmq_msg_init (msg_); + int rc = msg_->init (); + errno_assert (rc == 0); return true; } @@ -120,13 +121,15 @@ void zmq::session_t::clean_pipes () // Remove any half-read message from the in pipe. if (in_pipe) { while (incomplete_in) { - zmq_msg_t msg; - zmq_msg_init (&msg); + msg_t msg; + int rc = msg.init (); + errno_assert (rc == 0); if (!read (&msg)) { zmq_assert (!incomplete_in); break; } - zmq_msg_close (&msg); + rc = msg.close (); + errno_assert (rc == 0); } } } diff --git a/src/session.hpp b/src/session.hpp index 570daa1..d2f8882 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -45,8 +45,8 @@ namespace zmq // i_inout interface implementation. Note that detach method is not // implemented by generic session. Different session types may handle // engine disconnection in different ways. - bool read (::zmq_msg_t *msg_); - bool write (::zmq_msg_t *msg_); + bool read (msg_t *msg_); + bool write (msg_t *msg_); void flush (); void detach (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 9f3b1f6..d8af516 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -22,8 +22,6 @@ #include <string> #include <algorithm> -#include "../include/zmq.h" - #include "platform.hpp" #if defined ZMQ_HAVE_WINDOWS @@ -48,6 +46,7 @@ #include "platform.hpp" #include "likely.hpp" #include "uuid.hpp" +#include "msg.hpp" #include "pair.hpp" #include "pub.hpp" @@ -464,7 +463,7 @@ int zmq::socket_base_t::connect (const char *addr_) return 0; } -int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) +int zmq::socket_base_t::send (msg_t *msg_, int flags_) { // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { @@ -473,7 +472,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) } // Check whether message passed to the function is valid. - if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) { + if (unlikely (!msg_->check ())) { errno = EFAULT; return -1; } @@ -485,7 +484,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) // At this point we impose the MORE flag on the message. if (flags_ & ZMQ_SNDMORE) - msg_->flags |= ZMQ_MSG_MORE; + msg_->set_flags (msg_t::more); // Try to send the message. rc = xsend (msg_, flags_); @@ -509,7 +508,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) return 0; } -int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) +int zmq::socket_base_t::recv (msg_t *msg_, int flags_) { // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { @@ -518,7 +517,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) } // Check whether message passed to the function is valid. - if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) { + if (unlikely (!msg_->check ())) { errno = EFAULT; return -1; } @@ -543,9 +542,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) // If we have the message, return immediately. if (rc == 0) { - rcvmore = msg_->flags & ZMQ_MSG_MORE; + rcvmore = msg_->flags () & msg_t::more; if (rcvmore) - msg_->flags &= ~ZMQ_MSG_MORE; + msg_->reset_flags (msg_t::more); return 0; } @@ -565,9 +564,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) rc = xrecv (msg_, flags_); if (rc == 0) { - rcvmore = msg_->flags & ZMQ_MSG_MORE; + rcvmore = msg_->flags () & msg_t::more; if (rcvmore) - msg_->flags &= ~ZMQ_MSG_MORE; + msg_->reset_flags (msg_t::more); } return rc; } @@ -585,9 +584,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) block = true; } - rcvmore = msg_->flags & ZMQ_MSG_MORE; + rcvmore = msg_->flags () & msg_t::more; if (rcvmore) - msg_->flags &= ~ZMQ_MSG_MORE; + msg_->reset_flags (msg_t::more); return 0; } @@ -757,7 +756,7 @@ bool zmq::socket_base_t::xhas_out () return false; } -int zmq::socket_base_t::xsend (zmq_msg_t *msg_, int options_) +int zmq::socket_base_t::xsend (msg_t *msg_, int options_) { errno = ENOTSUP; return -1; @@ -768,7 +767,7 @@ bool zmq::socket_base_t::xhas_in () return false; } -int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_) +int zmq::socket_base_t::xrecv (msg_t *msg_, int options_) { errno = ENOTSUP; return -1; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 333cddd..0a5c574 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -24,8 +24,6 @@ #include <map> #include <vector> -#include "../include/zmq.h" - #include "own.hpp" #include "array.hpp" #include "mutex.hpp" @@ -69,8 +67,8 @@ namespace zmq int getsockopt (int option_, void *optval_, size_t *optvallen_); int bind (const char *addr_); int connect (const char *addr_); - int send (zmq_msg_t *msg_, int flags_); - int recv (zmq_msg_t *msg_, int flags_); + int send (class msg_t *msg_, int flags_); + int recv (class msg_t *msg_, int flags_); int close (); // These functions are used by the polling mechanism to determine @@ -123,11 +121,11 @@ namespace zmq // The default implementation assumes that send is not supported. virtual bool xhas_out (); - virtual int xsend (zmq_msg_t *msg_, int options_); + virtual int xsend (class msg_t *msg_, int options_); // The default implementation assumes that recv in not supported. virtual bool xhas_in (); - virtual int xrecv (zmq_msg_t *msg_, int options_); + virtual int xrecv (class msg_t *msg_, int options_); // We are declaring termination handler as protected so that // individual socket types can hook into the termination process diff --git a/src/sub.cpp b/src/sub.cpp index aef7369..2d6ade6 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -18,9 +18,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "sub.hpp" +#include "msg.hpp" zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : xsub_t (parent_, tid_) @@ -41,9 +40,10 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, } // Create the subscription message. - zmq_msg_t msg; - zmq_msg_init_size (&msg, optvallen_ + 1); - unsigned char *data = (unsigned char*) zmq_msg_data (&msg); + msg_t msg; + int rc = msg.init_size (optvallen_ + 1); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) msg.data (); if (option_ == ZMQ_SUBSCRIBE) *data = 1; else if (option_ == ZMQ_UNSUBSCRIBE) @@ -52,16 +52,17 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, // Pass it further on in the stack. int err = 0; - int rc = xsub_t::xsend (&msg, 0); + rc = xsub_t::xsend (&msg, 0); if (rc != 0) err = errno; - zmq_msg_close (&msg); + int rc2 = msg.close (); + errno_assert (rc2 == 0); if (rc != 0) errno = err; return rc; } -int zmq::sub_t::xsend (zmq_msg_t *msg_, int options_) +int zmq::sub_t::xsend (msg_t *msg_, int options_) { // Overload the XSUB's send. errno = ENOTSUP; diff --git a/src/sub.hpp b/src/sub.hpp index d1f467d..8575961 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -36,7 +36,7 @@ namespace zmq protected: int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (zmq_msg_t *msg_, int options_); + int xsend (class msg_t *msg_, int options_); bool xhas_out (); private: diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 0c1581d..f00d478 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -22,8 +22,6 @@ #include <string> -#include "../include/zmq.h" - #include "tcp_connecter.hpp" #include "platform.hpp" #include "ip.hpp" diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 8de564f..f40b0fe 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -20,8 +20,6 @@ #include <string.h> -#include "../include/zmq.h" - #include "tcp_listener.hpp" #include "platform.hpp" #include "ip.hpp" diff --git a/src/xpub.cpp b/src/xpub.cpp index ed56183..2b5c4eb 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -18,11 +18,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "xpub.hpp" -#include "err.hpp" #include "pipe.hpp" +#include "err.hpp" +#include "msg.hpp" zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), @@ -53,7 +52,7 @@ void zmq::xpub_t::process_term (int linger_) socket_base_t::process_term (linger_); } -int zmq::xpub_t::xsend (zmq_msg_t *msg_, int flags_) +int zmq::xpub_t::xsend (msg_t *msg_, int flags_) { return dist.send (msg_, flags_); } @@ -63,7 +62,7 @@ bool zmq::xpub_t::xhas_out () return dist.has_out (); } -int zmq::xpub_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::xpub_t::xrecv (msg_t *msg_, int flags_) { errno = EAGAIN; return -1; diff --git a/src/xpub.hpp b/src/xpub.hpp index e945198..19aa38a 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -39,9 +39,9 @@ namespace zmq // Implementations of virtual functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - int xsend (zmq_msg_t *msg_, int flags_); + int xsend (class msg_t *msg_, int flags_); bool xhas_out (); - int xrecv (zmq_msg_t *msg_, int flags_); + int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); private: diff --git a/src/xrep.cpp b/src/xrep.cpp index 75dc30e..5e01e2f 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -18,11 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "xrep.hpp" -#include "err.hpp" #include "pipe.hpp" +#include "err.hpp" zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), @@ -159,7 +157,7 @@ void zmq::xrep_t::activated (writer_t *pipe_) zmq_assert (false); } -int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) +int zmq::xrep_t::xsend (msg_t *msg_, int flags_) { // If this is the first part of the message it's the identity of the // peer to send the message to. @@ -168,44 +166,43 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. - if (msg_->flags & ZMQ_MSG_MORE) { + if (msg_->flags () & msg_t::more) { more_out = true; // Find the pipe associated with the identity stored in the prefix. // If there's no such pipe just silently ignore the message. - blob_t identity ((unsigned char*) zmq_msg_data (msg_), - zmq_msg_size (msg_)); + blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); outpipes_t::iterator it = outpipes.find (identity); if (it != outpipes.end ()) { current_out = it->second.writer; - zmq_msg_t empty; - int rc = zmq_msg_init (&empty); - zmq_assert (rc == 0); + msg_t empty; + int rc = empty.init (); + errno_assert (rc == 0); if (!current_out->check_write (&empty)) { it->second.active = false; more_out = false; current_out = NULL; - rc = zmq_msg_close (&empty); - zmq_assert (rc == 0); + rc = empty.close (); + errno_assert (rc == 0); errno = EAGAIN; return -1; } - rc = zmq_msg_close (&empty); - zmq_assert (rc == 0); + rc = empty.close (); + errno_assert (rc == 0); } } - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); return 0; } // Check whether this is the last part of the message. - more_out = msg_->flags & ZMQ_MSG_MORE; + more_out = msg_->flags () & msg_t::more; // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -217,36 +214,38 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) } } else { - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); + int rc = msg_->close (); + errno_assert (rc == 0); } // Detach the message from the data buffer. - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); + int rc = msg_->init (); + errno_assert (rc == 0); return 0; } -int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) { // If there is a prefetched message, return it. if (prefetched) { - zmq_msg_move (msg_, &prefetched_msg); - more_in = msg_->flags & ZMQ_MSG_MORE; + int rc = msg_->move (prefetched_msg); + errno_assert (rc == 0); + more_in = msg_->flags () & msg_t::more; prefetched = false; return 0; } // Deallocate old content of the message. - zmq_msg_close (msg_); + int rc = msg_->close (); + errno_assert (rc == 0); // If we are in the middle of reading a message, just grab next part of it. if (more_in) { zmq_assert (inpipes [current_in].active); bool fetched = inpipes [current_in].reader->read (msg_); zmq_assert (fetched); - more_in = msg_->flags & ZMQ_MSG_MORE; + more_in = msg_->flags () & msg_t::more; if (!more_in) { current_in++; if (current_in >= inpipes.size ()) @@ -264,12 +263,11 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) // If we have a message, create a prefix and return it to the caller. if (prefetched) { - int rc = zmq_msg_init_size (msg_, - inpipes [current_in].identity.size ()); - zmq_assert (rc == 0); - memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (), - zmq_msg_size (msg_)); - msg_->flags |= ZMQ_MSG_MORE; + int rc = msg_->init_size (inpipes [current_in].identity.size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), inpipes [current_in].identity.data (), + msg_->size ()); + msg_->set_flags (msg_t::more); return 0; } @@ -283,7 +281,8 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) // No message is available. Initialise the output parameter // to be a 0-byte message. - zmq_msg_init (msg_); + rc = msg_->init (); + errno_assert (rc == 0); errno = EAGAIN; return -1; } diff --git a/src/xrep.hpp b/src/xrep.hpp index d7fbe9f..1c45655 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -27,6 +27,7 @@ #include "socket_base.hpp" #include "blob.hpp" #include "pipe.hpp" +#include "msg.hpp" namespace zmq { @@ -45,8 +46,8 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, const blob_t &peer_identity_); - int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); + int xsend (class msg_t *msg_, int flags_); + int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); @@ -82,7 +83,7 @@ namespace zmq bool prefetched; // Holds the prefetched message. - zmq_msg_t prefetched_msg; + msg_t prefetched_msg; // If true, more incoming message parts are expected. bool more_in; diff --git a/src/xreq.cpp b/src/xreq.cpp index 96f1bba..2fda2c1 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -18,10 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "xreq.hpp" #include "err.hpp" +#include "msg.hpp" zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), @@ -52,12 +51,12 @@ void zmq::xreq_t::process_term (int linger_) socket_base_t::process_term (linger_); } -int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) +int zmq::xreq_t::xsend (msg_t *msg_, int flags_) { return lb.send (msg_, flags_); } -int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) { return fq.recv (msg_, flags_); } diff --git a/src/xreq.hpp b/src/xreq.hpp index 73af21f..e0cafe5 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -41,8 +41,8 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); + int xsend (class msg_t *msg_, int flags_); + int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/xsub.cpp b/src/xsub.cpp index b0c5795..b0e8cd2 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -20,8 +20,6 @@ #include <string.h> -#include "../include/zmq.h" - #include "xsub.hpp" #include "err.hpp" @@ -34,12 +32,14 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : options.type = ZMQ_XSUB; options.requires_in = true; options.requires_out = false; - zmq_msg_init (&message); + int rc = message.init (); + errno_assert (rc == 0); } zmq::xsub_t::~xsub_t () { - zmq_msg_close (&message); + int rc = message.close (); + errno_assert (rc == 0); } void zmq::xsub_t::xattach_pipes (class reader_t *inpipe_, @@ -55,10 +55,10 @@ void zmq::xsub_t::process_term (int linger_) socket_base_t::process_term (linger_); } -int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_) +int zmq::xsub_t::xsend (msg_t *msg_, int options_) { - size_t size = zmq_msg_size (msg_); - unsigned char *data = (unsigned char*) zmq_msg_data (msg_); + size_t size = msg_->size (); + unsigned char *data = (unsigned char*) msg_->data (); // Malformed subscriptions are dropped silently. if (size >= 1) { @@ -72,10 +72,10 @@ int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_) subscriptions.rm (data + 1, size - 1); } - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); return 0; } @@ -85,14 +85,15 @@ bool zmq::xsub_t::xhas_out () return true; } -int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) { // If there's already a message prepared by a previous call to zmq_poll, // return it straight ahead. if (has_message) { - zmq_msg_move (msg_, &message); + int rc = msg_->move (message); + errno_assert (rc == 0); has_message = false; - more = msg_->flags & ZMQ_MSG_MORE; + more = msg_->flags () & msg_t::more; return 0; } @@ -112,13 +113,13 @@ int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_) // Check whether the message matches at least one subscription. // Non-initial parts of the message are passed if (more || match (msg_)) { - more = msg_->flags & ZMQ_MSG_MORE; + more = msg_->flags () & msg_t::more; return 0; } // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (msg_->flags & ZMQ_MSG_MORE) { + while (msg_->flags () & msg_t::more) { rc = fq.recv (msg_, ZMQ_DONTWAIT); zmq_assert (rc == 0); } @@ -158,15 +159,15 @@ bool zmq::xsub_t::xhas_in () // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (message.flags & ZMQ_MSG_MORE) { + while (message.flags () & msg_t::more) { rc = fq.recv (&message, ZMQ_DONTWAIT); zmq_assert (rc == 0); } } } -bool zmq::xsub_t::match (zmq_msg_t *msg_) +bool zmq::xsub_t::match (msg_t *msg_) { - return subscriptions.check ((unsigned char*) zmq_msg_data (msg_), - zmq_msg_size (msg_)); + return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); } + diff --git a/src/xsub.hpp b/src/xsub.hpp index 6bd55ad..202a29f 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -21,10 +21,9 @@ #ifndef __ZMQ_XSUB_HPP_INCLUDED__ #define __ZMQ_XSUB_HPP_INCLUDED__ -#include "../include/zmq.h" - #include "trie.hpp" #include "socket_base.hpp" +#include "msg.hpp" #include "fq.hpp" namespace zmq @@ -42,9 +41,9 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - int xsend (zmq_msg_t *msg_, int options_); + int xsend (class msg_t *msg_, int options_); bool xhas_out (); - int xrecv (zmq_msg_t *msg_, int flags_); + int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); private: @@ -53,7 +52,7 @@ namespace zmq void process_term (int linger_); // Check whether the message matches at least one subscription. - bool match (zmq_msg_t *msg_); + bool match (class msg_t *msg_); // Fair queueing object for inbound pipes. fq_t fq; @@ -64,7 +63,7 @@ namespace zmq // If true, 'message' contains a matching message to return on the // next recv call. bool has_message; - zmq_msg_t message; + msg_t message; // If true, part of a multipart message was already received, but // there are following parts still waiting. diff --git a/src/zmq.cpp b/src/zmq.cpp index eb8cc40..b40d8b2 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -32,8 +32,6 @@ #include <poll.h> #endif -#include "../include/zmq.h" - #include <string.h> #include <errno.h> #include <stdlib.h> @@ -46,6 +44,7 @@ #include "clock.hpp" #include "ctx.hpp" #include "err.hpp" +#include "msg.hpp" #include "fd.hpp" #if !defined ZMQ_HAVE_WINDOWS @@ -57,6 +56,10 @@ #include <pgm/pgm.h> #endif +// Compile time check whether msg_t fits into zmq_msg_t. +typedef char check_msg_t_size + [sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1]; + void zmq_version (int *major_, int *minor_, int *patch_) { *major_ = ZMQ_VERSION_MAJOR; @@ -260,7 +263,7 @@ int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) return -1; } int sz = (int) zmq_msg_size (msg_); - int rc = (((zmq::socket_base_t*) s_)->send (msg_, flags_)); + int rc = (((zmq::socket_base_t*) s_)->send ((zmq::msg_t*) msg_, flags_)); if (unlikely (rc < 0)) return -1; return sz; @@ -272,12 +275,53 @@ int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_) errno = ENOTSOCK; return -1; } - int rc = (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); + int rc = (((zmq::socket_base_t*) s_)->recv ((zmq::msg_t*) msg_, flags_)); if (unlikely (rc < 0)) return -1; return (int) zmq_msg_size (msg_); } +int zmq_msg_init (zmq_msg_t *msg_) +{ + return ((zmq::msg_t*) msg_)->init (); +} + +int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) +{ + return ((zmq::msg_t*) msg_)->init_size (size_); +} + +int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, + zmq_free_fn *ffn_, void *hint_) +{ + return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_); +} + +int zmq_msg_close (zmq_msg_t *msg_) +{ + return ((zmq::msg_t*) msg_)->close (); +} + +int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) +{ + return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_); +} + +int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) +{ + return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_); +} + +void *zmq_msg_data (zmq_msg_t *msg_) +{ + return ((zmq::msg_t*) msg_)->data (); +} + +size_t zmq_msg_size (zmq_msg_t *msg_) +{ + return ((zmq::msg_t*) msg_)->size (); +} + #if defined ZMQ_FORCE_SELECT #define ZMQ_POLL_BASED_ON_SELECT #elif defined ZMQ_FORCE_POLL diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 5ca2367..ca7c66d 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -53,28 +53,27 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, peer_identity.assign (identity, uuid_t::uuid_blob_len + 1); // Create a list of props to send. - - zmq_msg_t msg; - int rc = zmq_msg_init_size (&msg, 4); + msg_t msg; + int rc = msg.init_size (4); errno_assert (rc == 0); - unsigned char *data = (unsigned char*) zmq_msg_data (&msg); + unsigned char *data = (unsigned char*) msg.data (); put_uint16 (data, prop_type); put_uint16 (data + 2, options.type); - msg.flags |= ZMQ_MSG_MORE; + msg.set_flags (msg_t::more); to_send.push_back (msg); if (!options.identity.empty ()) { - rc = zmq_msg_init_size (&msg, 2 + options.identity.size ()); + rc = msg.init_size (2 + options.identity.size ()); errno_assert (rc == 0); - data = (unsigned char*) zmq_msg_data (&msg); + data = (unsigned char*) msg.data (); put_uint16 (data, prop_identity); memcpy (data + 2, options.identity.data (), options.identity.size ()); - msg.flags |= ZMQ_MSG_MORE; + msg.set_flags (msg_t::more); to_send.push_back (msg); } // Remove the MORE flag from the last prop. - to_send.back ().flags &= ~ZMQ_MSG_MORE; + to_send.back ().reset_flags (msg_t::more); } zmq::zmq_init_t::~zmq_init_t () @@ -85,13 +84,13 @@ zmq::zmq_init_t::~zmq_init_t () // If there are unsent props still queued deallocate them. for (to_send_t::iterator it = to_send.begin (); it != to_send.end (); ++it) { - int rc = zmq_msg_close (&(*it)); + int rc = it->close (); errno_assert (rc == 0); } to_send.clear (); } -bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) +bool zmq::zmq_init_t::read (msg_t *msg_) { // If the identity was already sent, do nothing. if (to_send.empty ()) @@ -107,15 +106,15 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) return true; } -bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) +bool zmq::zmq_init_t::write (msg_t *msg_) { // If identity was already received, we are not interested // in subsequent messages. if (received) return false; - size_t size = zmq_msg_size (msg_); - unsigned char *data = (unsigned char*) zmq_msg_data (msg_); + size_t size = msg_->size (); + unsigned char *data = (unsigned char*) msg_->data (); // There should be at least property type in the message. zmq_assert (size >= 2); @@ -139,7 +138,7 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) zmq_assert (false); } - if (!(msg_->flags & ZMQ_MSG_MORE)) { + if (!(msg_->flags () & msg_t::more)) { received = true; finalise_initialisation (); } diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index 92ab05b..ec9b2b3 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -23,14 +23,13 @@ #include <vector> -#include "../include/zmq.h" - #include "i_inout.hpp" #include "i_engine.hpp" -#include "own.hpp" -#include "fd.hpp" #include "stdint.hpp" #include "blob.hpp" +#include "msg.hpp" +#include "own.hpp" +#include "fd.hpp" namespace zmq { @@ -58,8 +57,8 @@ namespace zmq void dispatch_engine (); // i_inout interface implementation. - bool read (::zmq_msg_t *msg_); - bool write (::zmq_msg_t *msg_); + bool read (class msg_t *msg_); + bool write (class msg_t *msg_); void flush (); void detach (); @@ -75,7 +74,7 @@ namespace zmq // List of messages to send to the peer during the connection // initiation phase. - typedef std::vector < ::zmq_msg_t> to_send_t; + typedef std::vector <msg_t> to_send_t; to_send_t to_send; // True if peer's identity was already received. |