summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-04-21 22:27:48 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-04-21 22:27:48 +0200
commite0246e32d79d71f8e73207b43aed8b23648e4fc7 (patch)
tree9952ee6fd39f4e27bbe932f6b6f30f0073009369
parent581697695aac72894f2d3fefac904b9d50b3ba67 (diff)
Message-related functionality factored out into msg_t class.
This patch addresses serveral issues: 1. It gathers message related functionality scattered over whole codebase into a single class. 2. It makes zmq_msg_t an opaque datatype. Internals of the class don't pollute zmq.h header file. 3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33% of memory in scenarios with large amount of small messages. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--include/zmq.h29
-rw-r--r--src/config.hpp4
-rw-r--r--src/ctx.cpp14
-rw-r--r--src/ctx.hpp2
-rw-r--r--src/decoder.cpp18
-rw-r--r--src/decoder.hpp5
-rw-r--r--src/dist.cpp66
-rw-r--r--src/dist.hpp6
-rw-r--r--src/encoder.cpp24
-rw-r--r--src/encoder.hpp5
-rw-r--r--src/err.cpp2
-rw-r--r--src/err.hpp3
-rw-r--r--src/fq.cpp13
-rw-r--r--src/fq.hpp3
-rw-r--r--src/i_inout.hpp7
-rw-r--r--src/io_thread.cpp2
-rw-r--r--src/ip.cpp4
-rw-r--r--src/lb.cpp28
-rw-r--r--src/lb.hpp2
-rw-r--r--src/msg.cpp249
-rw-r--r--src/msg.hpp107
-rw-r--r--src/object.hpp2
-rw-r--r--src/options.cpp2
-rw-r--r--src/pair.cpp25
-rw-r--r--src/pair.hpp4
-rw-r--r--src/pipe.cpp49
-rw-r--r--src/pipe.hpp13
-rw-r--r--src/pub.cpp1
-rw-r--r--src/pull.cpp5
-rw-r--r--src/pull.hpp2
-rw-r--r--src/push.cpp7
-rw-r--r--src/push.hpp2
-rw-r--r--src/rep.cpp15
-rw-r--r--src/rep.hpp4
-rw-r--r--src/req.cpp23
-rw-r--r--src/req.hpp4
-rw-r--r--src/session.cpp17
-rw-r--r--src/session.hpp4
-rw-r--r--src/socket_base.cpp29
-rw-r--r--src/socket_base.hpp10
-rw-r--r--src/sub.cpp17
-rw-r--r--src/sub.hpp2
-rw-r--r--src/tcp_connecter.cpp2
-rw-r--r--src/tcp_listener.cpp2
-rw-r--r--src/xpub.cpp9
-rw-r--r--src/xpub.hpp4
-rw-r--r--src/xrep.cpp69
-rw-r--r--src/xrep.hpp7
-rw-r--r--src/xreq.cpp7
-rw-r--r--src/xreq.hpp4
-rw-r--r--src/xsub.cpp41
-rw-r--r--src/xsub.hpp11
-rw-r--r--src/zmq.cpp52
-rw-r--r--src/zmq_init.cpp29
-rw-r--r--src/zmq_init.hpp13
55 files changed, 606 insertions, 474 deletions
diff --git a/include/zmq.h b/include/zmq.h
index 70197c9..2d01e24 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -121,34 +121,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);
/* 0MQ message definition. */
/******************************************************************************/
-/* Maximal size of "Very Small Message". VSMs are passed by value */
-/* to avoid excessive memory allocation/deallocation. */
-/* If VMSs larger than 255 bytes are required, type of 'vsm_size' */
-/* field in zmq_msg_t structure should be modified accordingly. */
-#define ZMQ_MAX_VSM_SIZE 30
-
-/* Message types. These integers may be stored in 'content' member of the */
-/* message instead of regular pointer to the data. */
-#define ZMQ_DELIMITER 31
-#define ZMQ_VSM 32
-
-/* Message flags. ZMQ_MSG_SHARED is strictly speaking not a message flag */
-/* (it has no equivalent in the wire format), however, making it a flag */
-/* allows us to pack the stucture tigher and thus improve performance. */
-#define ZMQ_MSG_MORE 1
-#define ZMQ_MSG_SHARED 128
-#define ZMQ_MSG_MASK 129 /* Merges all the flags */
-
-/* A message. Note that 'content' is not a pointer to the raw data. */
-/* Rather it is pointer to zmq::msg_content_t structure */
-/* (see src/msg_content.hpp for its definition). */
-typedef struct
-{
- void *content;
- unsigned char flags;
- unsigned char vsm_size;
- unsigned char vsm_data [ZMQ_MAX_VSM_SIZE];
-} zmq_msg_t;
+typedef unsigned char zmq_msg_t [32];
typedef void (zmq_free_fn) (void *data, void *hint);
diff --git a/src/config.hpp b/src/config.hpp
index 3df66c7..dff3f87 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -36,6 +36,10 @@ namespace zmq
// memory allocation by approximately 99.6%
message_pipe_granularity = 256,
+ // Size in bytes of the largest message that is still copied around
+ // rather than being reference-counted.
+ max_vsm_size = 29,
+
// Determines how often does socket poll for new commands when it
// still has unprocessed messages to handle. Thus, if it is set to 100,
// socket will process 100 inbound messages before doing the poll.
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 2758729..fb5420d 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -26,8 +26,9 @@
#include "io_thread.hpp"
#include "platform.hpp"
#include "reaper.hpp"
-#include "err.hpp"
#include "pipe.hpp"
+#include "err.hpp"
+#include "msg.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.h"
@@ -304,10 +305,10 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
void zmq::ctx_t::log (const char *format_, va_list args_)
{
// Create the log message.
- zmq_msg_t msg;
- int rc = zmq_msg_init_size (&msg, strlen (format_) + 1);
- zmq_assert (rc == 0);
- memcpy (zmq_msg_data (&msg), format_, zmq_msg_size (&msg));
+ msg_t msg;
+ int rc = msg.init_size (strlen (format_) + 1);
+ errno_assert (rc == 0);
+ memcpy (msg.data (), format_, msg.size ());
// At this point we migrate the log socket to the current thread.
// We rely on mutex for executing the memory barrier.
@@ -316,7 +317,8 @@ void zmq::ctx_t::log (const char *format_, va_list args_)
log_socket->send (&msg, 0);
log_sync.unlock ();
- zmq_msg_close (&msg);
+ rc = msg.close ();
+ errno_assert (rc == 0);
}
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 33d5dad..7d865fa 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -26,8 +26,6 @@
#include <string>
#include <stdarg.h>
-#include "../include/zmq.h"
-
#include "mailbox.hpp"
#include "semaphore.hpp"
#include "ypipe.hpp"
diff --git a/src/decoder.cpp b/src/decoder.cpp
index efb39e8..bcf5974 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -31,7 +31,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
destination (NULL),
maxmsgsize (maxmsgsize_)
{
- zmq_msg_init (&in_progress);
+ int rc = in_progress.init ();
+ errno_assert (rc == 0);
// At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
@@ -39,7 +40,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
zmq::decoder_t::~decoder_t ()
{
- zmq_msg_close (&in_progress);
+ int rc = in_progress.close ();
+ errno_assert (rc == 0);
}
void zmq::decoder_t::set_inout (i_inout *destination_)
@@ -71,9 +73,9 @@ bool zmq::decoder_t::one_byte_size_ready ()
errno = ENOMEM;
}
else
- rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
+ rc = in_progress.init_size (*tmpbuf - 1);
if (rc != 0 && errno == ENOMEM) {
- rc = zmq_msg_init (&in_progress);
+ rc = in_progress.init ();
errno_assert (rc == 0);
decoding_error ();
return false;
@@ -106,9 +108,9 @@ bool zmq::decoder_t::eight_byte_size_ready ()
errno = ENOMEM;
}
else
- rc = zmq_msg_init_size (&in_progress, size - 1);
+ rc = in_progress.init_size (size - 1);
if (rc != 0 && errno == ENOMEM) {
- rc = zmq_msg_init (&in_progress);
+ rc = in_progress.init ();
errno_assert (rc == 0);
decoding_error ();
return false;
@@ -122,9 +124,9 @@ bool zmq::decoder_t::eight_byte_size_ready ()
bool zmq::decoder_t::flags_ready ()
{
// Store the flags from the wire into the message structure.
- in_progress.flags = tmpbuf [0];
+ in_progress.set_flags (tmpbuf [0]);
- next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
+ next_step (in_progress.data (), in_progress.size (),
&decoder_t::message_ready);
return true;
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 23806a3..114ecef 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -27,10 +27,9 @@
#include <algorithm>
#include "err.hpp"
+#include "msg.hpp"
#include "stdint.hpp"
-#include "../include/zmq.h"
-
namespace zmq
{
@@ -196,7 +195,7 @@ namespace zmq
struct i_inout *destination;
unsigned char tmpbuf [8];
- ::zmq_msg_t in_progress;
+ msg_t in_progress;
int64_t maxmsgsize;
diff --git a/src/dist.cpp b/src/dist.cpp
index 9d50368..093da79 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -18,8 +18,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "dist.hpp"
#include "pipe.hpp"
#include "err.hpp"
@@ -89,10 +87,10 @@ void zmq::dist_t::activated (writer_t *pipe_)
active++;
}
-int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
+int zmq::dist_t::send (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
- bool msg_more = msg_->flags & ZMQ_MSG_MORE;
+ bool msg_more = msg_->flags () & msg_t::more;
// Push the message to active pipes.
distribute (msg_, flags_);
@@ -106,63 +104,33 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
return 0;
}
-void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_)
+void zmq::dist_t::distribute (msg_t *msg_, int flags_)
{
// If there are no active pipes available, simply drop the message.
if (active == 0) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return;
- }
-
- msg_content_t *content = (msg_content_t*) msg_->content;
-
- // For VSMs the copying is straighforward.
- if (content == (msg_content_t*) ZMQ_VSM) {
- for (pipes_t::size_type i = 0; i < active;)
- if (write (pipes [i], msg_))
- i++;
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return;
- }
-
- // Optimisation for the case when there's only a single pipe
- // to send the message to - no refcount adjustment i.e. no atomic
- // operations are needed.
- if (active == 1) {
- if (!write (pipes [0], msg_)) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- }
- int rc = zmq_msg_init (msg_);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
zmq_assert (rc == 0);
return;
}
- // There are at least 2 destinations for the message. That means we have
- // to deal with reference counting. First add N-1 references to
- // the content (we are holding one reference anyway, that's why -1).
- if (msg_->flags & ZMQ_MSG_SHARED)
- content->refcnt.add (active - 1);
- else {
- content->refcnt.set (active);
- msg_->flags |= ZMQ_MSG_SHARED;
- }
+ // Add active-1 references to the message. We already hold one reference,
+ // that's why -1.
+ msg_->add_refs (active - 1);
- // Push the message to all destinations.
+ // Push copy of the message to each active pipe.
for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
- content->refcnt.sub (1);
+ msg_->rm_refs (1);
else
i++;
}
- // Detach the original message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ // Detach the original message from the data buffer. Note that we don't
+ // close the message. That's because we've already used all the references.
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
}
bool zmq::dist_t::has_out ()
@@ -170,14 +138,14 @@ bool zmq::dist_t::has_out ()
return true;
}
-bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
+bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_)
{
if (!pipe_->write (msg_)) {
active--;
pipes.swap (pipes.index (pipe_), active);
return false;
}
- if (!(msg_->flags & ZMQ_MSG_MORE))
+ if (!(msg_->flags () & msg_t::more))
pipe_->flush ();
return true;
}
diff --git a/src/dist.hpp b/src/dist.hpp
index ad9767a..ea05305 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -40,7 +40,7 @@ namespace zmq
void attach (writer_t *pipe_);
void terminate ();
- int send (zmq_msg_t *msg_, int flags_);
+ int send (class msg_t *msg_, int flags_);
bool has_out ();
// i_writer_events interface implementation.
@@ -51,10 +51,10 @@ namespace zmq
// Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned.
- bool write (class writer_t *pipe_, zmq_msg_t *msg_);
+ bool write (class writer_t *pipe_, class msg_t *msg_);
// Put the message to all active pipes.
- void distribute (zmq_msg_t *msg_, int flags_);
+ void distribute (class msg_t *msg_, int flags_);
// Plug in all the delayed pipes.
void clear_new_pipes ();
diff --git a/src/encoder.cpp b/src/encoder.cpp
index 88e1dff..a42f06f 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -26,7 +26,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) :
encoder_base_t <encoder_t> (bufsize_),
source (NULL)
{
- zmq_msg_init (&in_progress);
+ int rc = in_progress.init ();
+ errno_assert (rc == 0);
// Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &encoder_t::message_ready, true);
@@ -34,7 +35,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) :
zmq::encoder_t::~encoder_t ()
{
- zmq_msg_close (&in_progress);
+ int rc = in_progress.close ();
+ errno_assert (rc == 0);
}
void zmq::encoder_t::set_inout (i_inout *source_)
@@ -45,7 +47,7 @@ void zmq::encoder_t::set_inout (i_inout *source_)
bool zmq::encoder_t::size_ready ()
{
// Write message body into the buffer.
- next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
+ next_step (in_progress.data (), in_progress.size (),
&encoder_t::message_ready, false);
return true;
}
@@ -53,19 +55,21 @@ bool zmq::encoder_t::size_ready ()
bool zmq::encoder_t::message_ready ()
{
// Destroy content of the old message.
- zmq_msg_close (&in_progress);
+ int rc = in_progress.close ();
+ errno_assert (rc == 0);
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
if (!source || !source->read (&in_progress)) {
- zmq_msg_init (&in_progress);
+ rc = in_progress.init ();
+ errno_assert (rc == 0);
return false;
}
// Get the message size.
- size_t size = zmq_msg_size (&in_progress);
+ size_t size = in_progress.size ();
// Account for the 'flags' byte.
size++;
@@ -75,16 +79,16 @@ bool zmq::encoder_t::message_ready ()
// message size. In both cases 'flags' field follows.
if (size < 255) {
tmpbuf [0] = (unsigned char) size;
- tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED);
+ tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 2, &encoder_t::size_ready,
- !(in_progress.flags & ZMQ_MSG_MORE));
+ !(in_progress.flags () & msg_t::more));
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
- tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED);
+ tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 10, &encoder_t::size_ready,
- !(in_progress.flags & ZMQ_MSG_MORE));
+ !(in_progress.flags () & msg_t::more));
}
return true;
}
diff --git a/src/encoder.hpp b/src/encoder.hpp
index 918ec2b..617b65b 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -27,8 +27,7 @@
#include <algorithm>
#include "err.hpp"
-
-#include "../include/zmq.h"
+#include "msg.hpp"
namespace zmq
{
@@ -172,7 +171,7 @@ namespace zmq
bool message_ready ();
struct i_inout *source;
- ::zmq_msg_t in_progress;
+ msg_t in_progress;
unsigned char tmpbuf [10];
encoder_t (const encoder_t&);
diff --git a/src/err.cpp b/src/err.cpp
index 8761c22..87a0006 100644
--- a/src/err.cpp
+++ b/src/err.cpp
@@ -18,8 +18,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "err.hpp"
#include "platform.hpp"
diff --git a/src/err.hpp b/src/err.hpp
index 3ffd99d..6289a08 100644
--- a/src/err.hpp
+++ b/src/err.hpp
@@ -21,6 +21,9 @@
#ifndef __ZMQ_ERR_HPP_INCLUDED__
#define __ZMQ_ERR_HPP_INCLUDED__
+// 0MQ-specific error codes are defined in zmq.h
+#include "../include/zmq.h"
+
#include <assert.h>
#include <errno.h>
#include <string.h>
diff --git a/src/fq.cpp b/src/fq.cpp
index 36fd435..20dc769 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -18,12 +18,11 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "fq.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "own.hpp"
+#include "msg.hpp"
zmq::fq_t::fq_t (own_t *sink_) :
active (0),
@@ -95,10 +94,11 @@ void zmq::fq_t::activated (reader_t *pipe_)
active++;
}
-int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
+int zmq::fq_t::recv (msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
- zmq_msg_close (msg_);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
// Round-robin over the pipes to get the next message.
for (int count = active; count != 0; count--) {
@@ -116,7 +116,7 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
// and replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer.
if (fetched) {
- more = msg_->flags & ZMQ_MSG_MORE;
+ more = msg_->flags () & msg_t::more;
if (!more) {
current++;
if (current >= active)
@@ -134,7 +134,8 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
// No message is available. Initialise the output parameter
// to be a 0-byte message.
- zmq_msg_init (msg_);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}
diff --git a/src/fq.hpp b/src/fq.hpp
index 8c6c95c..c35d458 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -23,6 +23,7 @@
#include "array.hpp"
#include "pipe.hpp"
+#include "msg.hpp"
namespace zmq
{
@@ -40,7 +41,7 @@ namespace zmq
void attach (reader_t *pipe_);
void terminate ();
- int recv (zmq_msg_t *msg_, int flags_);
+ int recv (msg_t *msg_, int flags_);
bool has_in ();
// i_reader_events implementation.
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
index 057b46c..3f8e8e0 100644
--- a/src/i_inout.hpp
+++ b/src/i_inout.hpp
@@ -21,8 +21,7 @@
#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__
#define __ZMQ_I_INOUT_HPP_INCLUDED__
-#include "../include/zmq.h"
-
+#include "msg.hpp"
#include "stdint.hpp"
namespace zmq
@@ -33,10 +32,10 @@ namespace zmq
virtual ~i_inout () {}
// Engine asks for a message to send to the network.
- virtual bool read (::zmq_msg_t *msg_) = 0;
+ virtual bool read (msg_t *msg_) = 0;
// Engine received message from the network and sends it further on.
- virtual bool write (::zmq_msg_t *msg_) = 0;
+ virtual bool write (msg_t *msg_) = 0;
// Flush all the previously written messages.
virtual void flush () = 0;
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index be52bdd..9678392 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -20,8 +20,6 @@
#include <new>
-#include "../include/zmq.h"
-
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
diff --git a/src/ip.cpp b/src/ip.cpp
index a63a97d..3591f02 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -23,11 +23,9 @@
#include <stdlib.h>
#include <string>
-#include "../include/zmq.h"
-
#include "ip.hpp"
-#include "platform.hpp"
#include "err.hpp"
+#include "platform.hpp"
#include "stdint.hpp"
#if defined ZMQ_HAVE_SOLARIS
diff --git a/src/lb.cpp b/src/lb.cpp
index 95af4a1..e81df3a 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -18,12 +18,11 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "lb.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "own.hpp"
+#include "msg.hpp"
zmq::lb_t::lb_t (own_t *sink_) :
active (0),
@@ -93,26 +92,26 @@ void zmq::lb_t::activated (writer_t *pipe_)
active++;
}
-int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
+int zmq::lb_t::send (msg_t *msg_, int flags_)
{
// Drop the message if required. If we are at the end of the message
// switch back to non-dropping mode.
if (dropping) {
- more = msg_->flags & ZMQ_MSG_MORE;
+ more = msg_->flags () & msg_t::more;
if (!more)
dropping = false;
- int rc = zmq_msg_close (msg_);
+ int rc = msg_->close ();
errno_assert (rc == 0);
- rc = zmq_msg_init (msg_);
+ rc = msg_->init ();
zmq_assert (rc == 0);
return 0;
}
while (active > 0) {
if (pipes [current]->write (msg_)) {
- more = msg_->flags & ZMQ_MSG_MORE;
+ more = msg_->flags () & msg_t::more;
break;
}
@@ -138,8 +137,8 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
}
// Detach the message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
return 0;
}
@@ -154,13 +153,16 @@ bool zmq::lb_t::has_out ()
while (active > 0) {
// Check whether zero-sized message can be written to the pipe.
- zmq_msg_t msg;
- zmq_msg_init (&msg);
+ msg_t msg;
+ int rc = msg.init ();
+ errno_assert (rc == 0);
if (pipes [current]->check_write (&msg)) {
- zmq_msg_close (&msg);
+ rc = msg.close ();
+ errno_assert (rc == 0);
return true;
}
- zmq_msg_close (&msg);
+ rc = msg.close ();
+ errno_assert (rc == 0);
// Deactivate the pipe.
active--;
diff --git a/src/lb.hpp b/src/lb.hpp
index 0dc11e2..f844b01 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -38,7 +38,7 @@ namespace zmq
void attach (writer_t *pipe_);
void terminate ();
- int send (zmq_msg_t *msg_, int flags_);
+ int send (msg_t *msg_, int flags_);
bool has_out ();
// i_writer_events interface implementation.
diff --git a/src/msg.cpp b/src/msg.cpp
index e800bd6..bd6f066 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -29,155 +29,234 @@
#include "likely.hpp"
#include "err.hpp"
-int zmq_msg_init (zmq_msg_t *msg_)
+bool zmq::msg_t::check ()
{
- msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
- msg_->vsm_size = 0;
+ return u.base.type >= type_min && u.base.type <= type_max;
+}
+
+int zmq::msg_t::init ()
+{
+ u.vsm.type = type_vsm;
+ u.vsm.flags = 0;
+ u.vsm.size = 0;
return 0;
}
-int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
+int zmq::msg_t::init_size (size_t size_)
{
- if (size_ <= ZMQ_MAX_VSM_SIZE) {
- msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
- msg_->vsm_size = (uint8_t) size_;
+ if (size_ <= max_vsm_size) {
+ u.vsm.type = type_vsm;
+ u.vsm.flags = 0;
+ u.vsm.size = (unsigned char) size_;
}
else {
- msg_->content =
- (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
- if (!msg_->content) {
+ u.lmsg.type = type_lmsg;
+ u.lmsg.flags = 0;
+ u.lmsg.content =
+ (content_t*) malloc (sizeof (content_t) + size_);
+ if (!u.lmsg.content) {
errno = ENOMEM;
return -1;
}
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
-
- zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- content->data = (void*) (content + 1);
- content->size = size_;
- content->ffn = NULL;
- content->hint = NULL;
- new (&content->refcnt) zmq::atomic_counter_t ();
+
+ u.lmsg.content->data = u.lmsg.content + 1;
+ u.lmsg.content->size = size_;
+ u.lmsg.content->ffn = NULL;
+ u.lmsg.content->hint = NULL;
+ new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
}
return 0;
}
-int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
- zmq_free_fn *ffn_, void *hint_)
+int zmq::msg_t::init_data (void *data_, size_t size_, zmq_free_fn *ffn_,
+ void *hint_)
{
- msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
- alloc_assert (msg_->content);
- msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
- zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- content->data = data_;
- content->size = size_;
- content->ffn = ffn_;
- content->hint = hint_;
- new (&content->refcnt) zmq::atomic_counter_t ();
+ u.lmsg.type = type_lmsg;
+ u.lmsg.flags = 0;
+ u.lmsg.content = (content_t*) malloc (sizeof (content_t));
+ alloc_assert (u.lmsg.content);
+
+ u.lmsg.content->data = data_;
+ u.lmsg.content->size = size_;
+ u.lmsg.content->ffn = ffn_;
+ u.lmsg.content->hint = hint_;
+ new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
return 0;
+
}
-int zmq_msg_close (zmq_msg_t *msg_)
+int zmq::msg_t::init_delimiter ()
{
- // Check the validity tag.
- if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) {
+ u.delimiter.type = type_delimiter;
+ u.delimiter.flags = 0;
+ return 0;
+}
+
+int zmq::msg_t::close ()
+{
+ // Check the validity of the message.
+ if (unlikely (!check ())) {
errno = EFAULT;
return -1;
}
- // For VSMs and delimiters there are no resources to free.
- if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
- msg_->content != (zmq::msg_content_t*) ZMQ_VSM) {
+ if (u.base.type == type_lmsg) {
- // If the content is not shared, or if it is shared and the reference.
+ // If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it.
- zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
+ if (!(u.lmsg.flags & msg_t::shared) ||
+ !u.lmsg.content->refcnt.sub (1)) {
- // We used "placement new" operator to initialize the reference.
- // counter so we call its destructor now.
- content->refcnt.~atomic_counter_t ();
+ // We used "placement new" operator to initialize the reference
+ // counter so we call the destructor explicitly now.
+ u.lmsg.content->refcnt.~atomic_counter_t ();
- if (content->ffn)
- content->ffn (content->data, content->hint);
- free (content);
+ if (u.lmsg.content->ffn)
+ u.lmsg.content->ffn (u.lmsg.content->data,
+ u.lmsg.content->hint);
+ free (u.lmsg.content);
}
}
- // Remove the validity tag from the message.
- msg_->flags = 0;
+ // Make the message invalid.
+ u.base.type = 0;
return 0;
+
}
-int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
+int zmq::msg_t::move (msg_t &src_)
{
- // Check the validity tags.
- if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||
- (src_->flags | ZMQ_MSG_MASK) != 0xff)) {
+ // Check the validity of the source.
+ if (unlikely (!src_.check ())) {
errno = EFAULT;
return -1;
}
- zmq_msg_close (dest_);
- *dest_ = *src_;
- zmq_msg_init (src_);
+ int rc = close ();
+ if (unlikely (rc < 0))
+ return rc;
+
+ *this = src_;
+
+ rc = src_.init ();
+ if (unlikely (rc < 0))
+ return rc;
+
return 0;
}
-int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
+int zmq::msg_t::copy (msg_t &src_)
{
- // Check the validity tags.
- if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||
- (src_->flags | ZMQ_MSG_MASK) != 0xff)) {
+ // Check the validity of the source.
+ if (unlikely (!src_.check ())) {
errno = EFAULT;
return -1;
}
- zmq_msg_close (dest_);
+ int rc = close ();
+ if (unlikely (rc < 0))
+ return rc;
- // VSMs and delimiters require no special handling.
- if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
- src_->content != (zmq::msg_content_t*) ZMQ_VSM) {
+ if (src_.u.base.type == type_lmsg) {
// One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2.
- zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
- if (src_->flags & ZMQ_MSG_SHARED)
- content->refcnt.add (1);
+ if (src_.u.lmsg.flags & msg_t::shared)
+ src_.u.lmsg.content->refcnt.add (1);
else {
- src_->flags |= ZMQ_MSG_SHARED;
- content->refcnt.set (2);
+ src_.u.lmsg.flags |= msg_t::shared;
+ src_.u.lmsg.content->refcnt.set (2);
}
}
- *dest_ = *src_;
+ *this = src_;
+
return 0;
+
+}
+
+void *zmq::msg_t::data ()
+{
+ // Check the validity of the message.
+ zmq_assert (check ());
+
+ switch (u.base.type) {
+ case type_vsm:
+ return u.vsm.data;
+ case type_lmsg:
+ return u.lmsg.content->data;
+ default:
+ zmq_assert (false);
+ }
}
-void *zmq_msg_data (zmq_msg_t *msg_)
+size_t zmq::msg_t::size ()
{
- // Check the validity tag.
- zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
+ // Check the validity of the message.
+ zmq_assert (check ());
- if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
- return msg_->vsm_data;
- if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
- return NULL;
+ switch (u.base.type) {
+ case type_vsm:
+ return u.vsm.size;
+ case type_lmsg:
+ return u.lmsg.content->size;
+ default:
+ zmq_assert (false);
+ }
+}
- return ((zmq::msg_content_t*) msg_->content)->data;
+unsigned char zmq::msg_t::flags ()
+{
+ return u.base.flags;
}
-size_t zmq_msg_size (zmq_msg_t *msg_)
+void zmq::msg_t::set_flags (unsigned char flags_)
{
- // Check the validity tag.
- zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
+ u.base.flags |= flags_;
+}
- if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
- return msg_->vsm_size;
- if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
- return 0;
+void zmq::msg_t::reset_flags (unsigned char flags_)
+{
+ u.base.flags &= ~flags_;
+}
+
+bool zmq::msg_t::is_delimiter ()
+{
+ return u.base.type == type_delimiter;
+}
+
+void zmq::msg_t::add_refs (int refs_)
+{
+ zmq_assert (refs_ >= 0);
- return ((zmq::msg_content_t*) msg_->content)->size;
+ // No copies required.
+ if (!refs_)
+ return;
+
+ // VSMs and delimiters can be copied straight away. The only message type
+ // that needs special care are long messages.
+ if (u.base.type == type_lmsg) {
+ if (u.lmsg.flags & msg_t::shared)
+ u.lmsg.content->refcnt.add (refs_);
+ else {
+ u.lmsg.content->refcnt.set (refs_ + 1);
+ u.lmsg.flags |= msg_t::shared;
+ }
+ }
+}
+
+void zmq::msg_t::rm_refs (int refs_)
+{
+ zmq_assert (refs_ >= 0);
+
+ // No copies required.
+ if (!refs_)
+ return;
+
+ // The only message type that needs special care are long messages.
+ if (u.base.type == type_lmsg) {
+ zmq_assert (u.lmsg.flags & msg_t::shared);
+ u.lmsg.content->refcnt.sub (refs_);
+ }
}
diff --git a/src/msg.hpp b/src/msg.hpp
index 7e22098..b7d21f6 100644
--- a/src/msg.hpp
+++ b/src/msg.hpp
@@ -23,28 +23,105 @@
#include <stddef.h>
-#include "../include/zmq.h"
-
+#include "config.hpp"
#include "atomic_counter.hpp"
namespace zmq
{
- // Shared message buffer. Message data are either allocated in one
- // continuous block along with this structure - thus avoiding one
- // malloc/free pair or they are stored in used-supplied memory.
- // In the latter case, ffn member stores pointer to the function to be
- // used to deallocate the data. If the buffer is actually shared (there
- // are at least 2 references to it) refcount member contains number of
- // references.
+ // Note that this structure needs to be explicitly constructed
+ // (init functions) and destructed (close function).
- struct msg_content_t
+ class msg_t
{
- void *data;
- size_t size;
- zmq_free_fn *ffn;
- void *hint;
- zmq::atomic_counter_t refcnt;
+ public:
+
+ // Mesage flags.
+ enum
+ {
+ more = 1,
+ shared = 128
+ };
+
+ // Signature for free function to deallocate the message content.
+ typedef void (free_fn_t) (void *data, void *hint);
+
+ bool check ();
+ int init ();
+ int init_size (size_t size_);
+ int init_data (void *data_, size_t size_, free_fn_t *ffn_,
+ void *hint_);
+ int init_delimiter ();
+ int close ();
+ int move (msg_t &src_);
+ int copy (msg_t &src_);
+ void *data ();
+ size_t size ();
+ unsigned char flags ();
+ void set_flags (unsigned char flags_);
+ void reset_flags (unsigned char flags_);
+ bool is_delimiter ();
+
+ // After calling this function you can copy the message in POD-style
+ // refs_ times. No need to call copy.
+ void add_refs (int refs_);
+
+ // Removes references previously added by add_refs.
+ void rm_refs (int refs_);
+
+ private:
+
+ // Shared message buffer. Message data are either allocated in one
+ // continuous block along with this structure - thus avoiding one
+ // malloc/free pair or they are stored in used-supplied memory.
+ // In the latter case, ffn member stores pointer to the function to be
+ // used to deallocate the data. If the buffer is actually shared (there
+ // are at least 2 references to it) refcount member contains number of
+ // references.
+ struct content_t
+ {
+ void *data;
+ size_t size;
+ free_fn_t *ffn;
+ void *hint;
+ zmq::atomic_counter_t refcnt;
+ };
+
+ // Different message types.
+ enum type_t
+ {
+ type_min = 101,
+ type_vsm = 101,
+ type_lmsg = 102,
+ type_delimiter = 103,
+ type_max = 103
+ };
+
+ // Note that fields shared between different message types are not
+ // moved to tha parent class (msg_t). This way we ger tighter packing
+ // of the data. Shared fields can be accessed via 'base' member of
+ // the union.
+ union {
+ struct {
+ unsigned char type;
+ unsigned char flags;
+ } base;
+ struct {
+ unsigned char type;
+ unsigned char flags;
+ unsigned char size;
+ unsigned char data [max_vsm_size];
+ } vsm;
+ struct {
+ unsigned char type;
+ unsigned char flags;
+ content_t *content;
+ } lmsg;
+ struct {
+ unsigned char type;
+ unsigned char flags;
+ } delimiter;
+ } u;
};
}
diff --git a/src/object.hpp b/src/object.hpp
index 706303b..0f5e61b 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -21,8 +21,6 @@
#ifndef __ZMQ_OBJECT_HPP_INCLUDED__
#define __ZMQ_OBJECT_HPP_INCLUDED__
-#include "../include/zmq.h"
-
#include "stdint.hpp"
#include "blob.hpp"
diff --git a/src/options.cpp b/src/options.cpp
index 556ffd8..399fd27 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -20,8 +20,6 @@
#include <string.h>
-#include "../include/zmq.h"
-
#include "options.hpp"
#include "err.hpp"
diff --git a/src/pair.cpp b/src/pair.cpp
index 1acc60f..d877b54 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -18,11 +18,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "pair.hpp"
#include "err.hpp"
#include "pipe.hpp"
+#include "msg.hpp"
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
@@ -116,7 +115,7 @@ void zmq::pair_t::activated (class writer_t *pipe_)
outpipe_alive = true;
}
-int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::pair_t::xsend (msg_t *msg_, int flags_)
{
if (outpipe == NULL || !outpipe_alive) {
errno = EAGAIN;
@@ -133,16 +132,17 @@ int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
outpipe->flush ();
// Detach the original message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
return 0;
}
-int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::pair_t::xrecv (msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
- zmq_msg_close (msg_);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) {
@@ -150,7 +150,8 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
inpipe_alive = false;
// Initialise the output parameter to be a 0-byte message.
- zmq_msg_init (msg_);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}
@@ -171,10 +172,12 @@ bool zmq::pair_t::xhas_out ()
if (!outpipe || !outpipe_alive)
return false;
- zmq_msg_t msg;
- zmq_msg_init (&msg);
+ msg_t msg;
+ int rc = msg.init ();
+ errno_assert (rc == 0);
outpipe_alive = outpipe->check_write (&msg);
- zmq_msg_close (&msg);
+ rc = msg.close ();
+ errno_assert (rc == 0);
return outpipe_alive;
}
diff --git a/src/pair.hpp b/src/pair.hpp
index 54e60b5..a10e15a 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -40,8 +40,8 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- int xsend (zmq_msg_t *msg_, int flags_);
- int xrecv (zmq_msg_t *msg_, int flags_);
+ int xsend (class msg_t *msg_, int flags_);
+ int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 2af2dc2..36dc808 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -20,8 +20,6 @@
#include <new>
-#include "../include/zmq.h"
-
#include "pipe.hpp"
#include "likely.hpp"
@@ -53,11 +51,12 @@ zmq::reader_t::~reader_t ()
zmq_assert (pipe);
// First delete all the unread messages in the pipe. We have to do it by
- // hand because zmq_msg_t is a POD, not a class, so there's no associated
- // destructor.
- zmq_msg_t msg;
- while (pipe->read (&msg))
- zmq_msg_close (&msg);
+ // hand because msg_t doesn't have automatic destructor.
+ msg_t msg;
+ while (pipe->read (&msg)) {
+ int rc = msg.close ();
+ errno_assert (rc == 0);
+ }
delete pipe;
}
@@ -68,11 +67,9 @@ void zmq::reader_t::set_event_sink (i_reader_events *sink_)
sink = sink_;
}
-bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
+bool zmq::reader_t::is_delimiter (msg_t &msg_)
{
- unsigned char *offset = 0;
-
- return msg_.content == (void*) (offset + ZMQ_DELIMITER);
+ return msg_.is_delimiter ();
}
bool zmq::reader_t::check_read ()
@@ -89,7 +86,7 @@ bool zmq::reader_t::check_read ()
// If the next item in the pipe is message delimiter,
// initiate its termination.
if (pipe->probe (is_delimiter)) {
- zmq_msg_t msg;
+ msg_t msg;
bool ok = pipe->read (&msg);
zmq_assert (ok);
if (sink)
@@ -101,7 +98,7 @@ bool zmq::reader_t::check_read ()
return true;
}
-bool zmq::reader_t::read (zmq_msg_t *msg_)
+bool zmq::reader_t::read (msg_t *msg_)
{
if (!active)
return false;
@@ -112,15 +109,14 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
}
// If delimiter was read, start termination process of the pipe.
- unsigned char *offset = 0;
- if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
+ if (msg_->is_delimiter ()) {
if (sink)
sink->delimited (this);
terminate ();
return false;
}
- if (!(msg_->flags & ZMQ_MSG_MORE))
+ if (!(msg_->flags () & msg_t::more))
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
@@ -187,7 +183,7 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_)
sink = sink_;
}
-bool zmq::writer_t::check_write (zmq_msg_t *msg_)
+bool zmq::writer_t::check_write (msg_t *msg_)
{
// We've already checked and there's no space free for the new message.
// There's no point in checking once again.
@@ -202,13 +198,13 @@ bool zmq::writer_t::check_write (zmq_msg_t *msg_)
return true;
}
-bool zmq::writer_t::write (zmq_msg_t *msg_)
+bool zmq::writer_t::write (msg_t *msg_)
{
if (unlikely (!check_write (msg_)))
return false;
- pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
- if (!(msg_->flags & ZMQ_MSG_MORE))
+ pipe->write (*msg_, msg_->flags () & msg_t::more);
+ if (!(msg_->flags () & msg_t::more))
msgs_written++;
return true;
@@ -217,10 +213,11 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
void zmq::writer_t::rollback ()
{
// Remove incomplete message from the pipe.
- zmq_msg_t msg;
+ msg_t msg;
while (pipe->unwrite (&msg)) {
- zmq_assert (msg.flags & ZMQ_MSG_MORE);
- zmq_msg_close (&msg);
+ zmq_assert (msg.flags () & msg_t::more);
+ int rc = msg.close ();
+ errno_assert (rc == 0);
}
}
@@ -246,10 +243,8 @@ void zmq::writer_t::terminate ()
// Push delimiter into the pipe. Trick the compiler to belive that
// the tag is a valid pointer. Note that watermarks are not checked
// thus the delimiter can be written even though the pipe is full.
- zmq_msg_t msg;
- const unsigned char *offset = 0;
- msg.content = (void*) (offset + ZMQ_DELIMITER);
- msg.flags = 0;
+ msg_t msg;
+ msg.init_delimiter ();
pipe->write (msg, false);
flush ();
}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 3230d02..75b5c47 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -21,8 +21,7 @@
#ifndef __ZMQ_PIPE_HPP_INCLUDED__
#define __ZMQ_PIPE_HPP_INCLUDED__
-#include "../include/zmq.h"
-
+#include "msg.hpp"
#include "array.hpp"
#include "ypipe.hpp"
#include "config.hpp"
@@ -43,7 +42,7 @@ namespace zmq
// event. When endpoint processes the event and returns, associated
// reader/writer object is deallocated.
- typedef ypipe_t <zmq_msg_t, message_pipe_granularity> pipe_t;
+ typedef ypipe_t <msg_t, message_pipe_granularity> pipe_t;
struct i_reader_events
{
@@ -69,7 +68,7 @@ namespace zmq
bool check_read ();
// Reads a message to the underlying pipe.
- bool read (zmq_msg_t *msg_);
+ bool read (msg_t *msg_);
// Ask pipe to terminate.
void terminate ();
@@ -87,7 +86,7 @@ namespace zmq
void process_pipe_term_ack ();
// Returns true if the message is delimiter; false otherwise.
- static bool is_delimiter (zmq_msg_t &msg_);
+ static bool is_delimiter (msg_t &msg_);
// True, if pipe can be read from.
bool active;
@@ -136,11 +135,11 @@ namespace zmq
// Checks whether messages can be written to the pipe.
// If writing the message would cause high watermark
// the function returns false.
- bool check_write (zmq_msg_t *msg_);
+ bool check_write (msg_t *msg_);
// Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached.
- bool write (zmq_msg_t *msg_);
+ bool write (msg_t *msg_);
// Remove unfinished part of a message from the pipe.
void rollback ();
diff --git a/src/pub.cpp b/src/pub.cpp
index 74f07fc..8558265 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -19,6 +19,7 @@
*/
#include "pub.hpp"
+#include "msg.hpp"
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) :
xpub_t (parent_, tid_)
diff --git a/src/pull.cpp b/src/pull.cpp
index a8c2466..b9d4433 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -18,10 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "pull.hpp"
#include "err.hpp"
+#include "msg.hpp"
zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
@@ -49,7 +48,7 @@ void zmq::pull_t::process_term (int linger_)
socket_base_t::process_term (linger_);
}
-int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::pull_t::xrecv (msg_t *msg_, int flags_)
{
return fq.recv (msg_, flags_);
}
diff --git a/src/pull.hpp b/src/pull.hpp
index 95084ba..ffc3fdb 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -39,7 +39,7 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- int xrecv (zmq_msg_t *msg_, int flags_);
+ int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
private:
diff --git a/src/push.cpp b/src/push.cpp
index 072994f..d6ee399 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -18,11 +18,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "push.hpp"
-#include "err.hpp"
#include "pipe.hpp"
+#include "err.hpp"
+#include "msg.hpp"
zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
@@ -50,7 +49,7 @@ void zmq::push_t::process_term (int linger_)
socket_base_t::process_term (linger_);
}
-int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::push_t::xsend (msg_t *msg_, int flags_)
{
return lb.send (msg_, flags_);
}
diff --git a/src/push.hpp b/src/push.hpp
index f04b25f..c4d63f6 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -39,7 +39,7 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- int xsend (zmq_msg_t *msg_, int flags_);
+ int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
private:
diff --git a/src/rep.cpp b/src/rep.cpp
index 46c35cb..ef0defc 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -18,10 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "rep.hpp"
#include "err.hpp"
+#include "msg.hpp"
zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_) :
xrep_t (parent_, tid_),
@@ -35,7 +34,7 @@ zmq::rep_t::~rep_t ()
{
}
-int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::rep_t::xsend (msg_t *msg_, int flags_)
{
// If we are in the middle of receiving a request, we cannot send reply.
if (!sending_reply) {
@@ -43,7 +42,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
- bool more = (msg_->flags & ZMQ_MSG_MORE);
+ bool more = (msg_->flags () & msg_t::more);
// Push message to the reply pipe.
int rc = xrep_t::xsend (msg_, flags_);
@@ -57,7 +56,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
{
// If we are in middle of sending a reply, we cannot receive next request.
if (sending_reply) {
@@ -78,10 +77,10 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
- zmq_assert (msg_->flags & ZMQ_MSG_MORE);
+ zmq_assert (msg_->flags () & msg_t::more);
// Empty message part delimits the traceback stack.
- bottom = (zmq_msg_size (msg_) == 0);
+ bottom = (msg_->size () == 0);
// Push it to the reply pipe.
rc = xrep_t::xsend (msg_, flags_);
@@ -98,7 +97,7 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
return rc;
// If whole request is read, flip the FSM to reply-sending state.
- if (!(msg_->flags & ZMQ_MSG_MORE)) {
+ if (!(msg_->flags () & msg_t::more)) {
sending_reply = true;
request_begins = true;
}
diff --git a/src/rep.hpp b/src/rep.hpp
index d0dd9c8..a13853d 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -34,8 +34,8 @@ namespace zmq
~rep_t ();
// Overloads of functions from socket_base_t.
- int xsend (zmq_msg_t *msg_, int flags_);
- int xrecv (zmq_msg_t *msg_, int flags_);
+ int xsend (class msg_t *msg_, int flags_);
+ int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/req.cpp b/src/req.cpp
index 503f221..6bf502f 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -18,10 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "req.hpp"
#include "err.hpp"
+#include "msg.hpp"
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_),
@@ -35,7 +34,7 @@ zmq::req_t::~req_t ()
{
}
-int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::req_t::xsend (msg_t *msg_, int flags_)
{
// If we've sent a request and we still haven't got the reply,
// we can't send another request.
@@ -46,17 +45,17 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
// First part of the request is empty message part (stack bottom).
if (message_begins) {
- zmq_msg_t prefix;
- int rc = zmq_msg_init (&prefix);
- zmq_assert (rc == 0);
- prefix.flags |= ZMQ_MSG_MORE;
+ msg_t prefix;
+ int rc = prefix.init ();
+ errno_assert (rc == 0);
+ prefix.set_flags (msg_t::more);
rc = xreq_t::xsend (&prefix, flags_);
if (rc != 0)
return rc;
message_begins = false;
}
- bool more = msg_->flags & ZMQ_MSG_MORE;
+ bool more = msg_->flags () & msg_t::more;
int rc = xreq_t::xsend (msg_, flags_);
if (rc != 0)
@@ -71,7 +70,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::req_t::xrecv (msg_t *msg_, int flags_)
{
// If request wasn't send, we can't wait for reply.
if (!receiving_reply) {
@@ -84,8 +83,8 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
int rc = xreq_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
- zmq_assert (msg_->flags & ZMQ_MSG_MORE);
- zmq_assert (zmq_msg_size (msg_) == 0);
+ zmq_assert (msg_->flags () & msg_t::more);
+ zmq_assert (msg_->size () == 0);
message_begins = false;
}
@@ -94,7 +93,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return rc;
// If the reply is fully received, flip the FSM into request-sending state.
- if (!(msg_->flags & ZMQ_MSG_MORE)) {
+ if (!(msg_->flags () & msg_t::more)) {
receiving_reply = false;
message_begins = true;
}
diff --git a/src/req.hpp b/src/req.hpp
index 3138498..e0554ac 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -34,8 +34,8 @@ namespace zmq
~req_t ();
// Overloads of functions from socket_base_t.
- int xsend (zmq_msg_t *msg_, int flags_);
- int xrecv (zmq_msg_t *msg_, int flags_);
+ int xsend (class msg_t *msg_, int flags_);
+ int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/session.cpp b/src/session.cpp
index 5f970cc..499fe40 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -80,7 +80,7 @@ void zmq::session_t::proceed_with_term ()
own_t::process_term (0);
}
-bool zmq::session_t::read (::zmq_msg_t *msg_)
+bool zmq::session_t::read (msg_t *msg_)
{
if (!in_pipe)
return false;
@@ -88,14 +88,15 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
if (!in_pipe->read (msg_))
return false;
- incomplete_in = msg_->flags & ZMQ_MSG_MORE;
+ incomplete_in = msg_->flags () & msg_t::more;
return true;
}
-bool zmq::session_t::write (::zmq_msg_t *msg_)
+bool zmq::session_t::write (msg_t *msg_)
{
if (out_pipe && out_pipe->write (msg_)) {
- zmq_msg_init (msg_);
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
return true;
}
@@ -120,13 +121,15 @@ void zmq::session_t::clean_pipes ()
// Remove any half-read message from the in pipe.
if (in_pipe) {
while (incomplete_in) {
- zmq_msg_t msg;
- zmq_msg_init (&msg);
+ msg_t msg;
+ int rc = msg.init ();
+ errno_assert (rc == 0);
if (!read (&msg)) {
zmq_assert (!incomplete_in);
break;
}
- zmq_msg_close (&msg);
+ rc = msg.close ();
+ errno_assert (rc == 0);
}
}
}
diff --git a/src/session.hpp b/src/session.hpp
index 570daa1..d2f8882 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -45,8 +45,8 @@ namespace zmq
// i_inout interface implementation. Note that detach method is not
// implemented by generic session. Different session types may handle
// engine disconnection in different ways.
- bool read (::zmq_msg_t *msg_);
- bool write (::zmq_msg_t *msg_);
+ bool read (msg_t *msg_);
+ bool write (msg_t *msg_);
void flush ();
void detach ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 9f3b1f6..d8af516 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -22,8 +22,6 @@
#include <string>
#include <algorithm>
-#include "../include/zmq.h"
-
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
@@ -48,6 +46,7 @@
#include "platform.hpp"
#include "likely.hpp"
#include "uuid.hpp"
+#include "msg.hpp"
#include "pair.hpp"
#include "pub.hpp"
@@ -464,7 +463,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return 0;
}
-int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
+int zmq::socket_base_t::send (msg_t *msg_, int flags_)
{
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
@@ -473,7 +472,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
}
// Check whether message passed to the function is valid.
- if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) {
+ if (unlikely (!msg_->check ())) {
errno = EFAULT;
return -1;
}
@@ -485,7 +484,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
// At this point we impose the MORE flag on the message.
if (flags_ & ZMQ_SNDMORE)
- msg_->flags |= ZMQ_MSG_MORE;
+ msg_->set_flags (msg_t::more);
// Try to send the message.
rc = xsend (msg_, flags_);
@@ -509,7 +508,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
+int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
{
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
@@ -518,7 +517,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
}
// Check whether message passed to the function is valid.
- if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) {
+ if (unlikely (!msg_->check ())) {
errno = EFAULT;
return -1;
}
@@ -543,9 +542,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// If we have the message, return immediately.
if (rc == 0) {
- rcvmore = msg_->flags & ZMQ_MSG_MORE;
+ rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
- msg_->flags &= ~ZMQ_MSG_MORE;
+ msg_->reset_flags (msg_t::more);
return 0;
}
@@ -565,9 +564,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
rc = xrecv (msg_, flags_);
if (rc == 0) {
- rcvmore = msg_->flags & ZMQ_MSG_MORE;
+ rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
- msg_->flags &= ~ZMQ_MSG_MORE;
+ msg_->reset_flags (msg_t::more);
}
return rc;
}
@@ -585,9 +584,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
block = true;
}
- rcvmore = msg_->flags & ZMQ_MSG_MORE;
+ rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
- msg_->flags &= ~ZMQ_MSG_MORE;
+ msg_->reset_flags (msg_t::more);
return 0;
}
@@ -757,7 +756,7 @@ bool zmq::socket_base_t::xhas_out ()
return false;
}
-int zmq::socket_base_t::xsend (zmq_msg_t *msg_, int options_)
+int zmq::socket_base_t::xsend (msg_t *msg_, int options_)
{
errno = ENOTSUP;
return -1;
@@ -768,7 +767,7 @@ bool zmq::socket_base_t::xhas_in ()
return false;
}
-int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_)
+int zmq::socket_base_t::xrecv (msg_t *msg_, int options_)
{
errno = ENOTSUP;
return -1;
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 333cddd..0a5c574 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -24,8 +24,6 @@
#include <map>
#include <vector>
-#include "../include/zmq.h"
-
#include "own.hpp"
#include "array.hpp"
#include "mutex.hpp"
@@ -69,8 +67,8 @@ namespace zmq
int getsockopt (int option_, void *optval_, size_t *optvallen_);
int bind (const char *addr_);
int connect (const char *addr_);
- int send (zmq_msg_t *msg_, int flags_);
- int recv (zmq_msg_t *msg_, int flags_);
+ int send (class msg_t *msg_, int flags_);
+ int recv (class msg_t *msg_, int flags_);
int close ();
// These functions are used by the polling mechanism to determine
@@ -123,11 +121,11 @@ namespace zmq
// The default implementation assumes that send is not supported.
virtual bool xhas_out ();
- virtual int xsend (zmq_msg_t *msg_, int options_);
+ virtual int xsend (class msg_t *msg_, int options_);
// The default implementation assumes that recv in not supported.
virtual bool xhas_in ();
- virtual int xrecv (zmq_msg_t *msg_, int options_);
+ virtual int xrecv (class msg_t *msg_, int options_);
// We are declaring termination handler as protected so that
// individual socket types can hook into the termination process
diff --git a/src/sub.cpp b/src/sub.cpp
index aef7369..2d6ade6 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -18,9 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "sub.hpp"
+#include "msg.hpp"
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) :
xsub_t (parent_, tid_)
@@ -41,9 +40,10 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
}
// Create the subscription message.
- zmq_msg_t msg;
- zmq_msg_init_size (&msg, optvallen_ + 1);
- unsigned char *data = (unsigned char*) zmq_msg_data (&msg);
+ msg_t msg;
+ int rc = msg.init_size (optvallen_ + 1);
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) msg.data ();
if (option_ == ZMQ_SUBSCRIBE)
*data = 1;
else if (option_ == ZMQ_UNSUBSCRIBE)
@@ -52,16 +52,17 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
// Pass it further on in the stack.
int err = 0;
- int rc = xsub_t::xsend (&msg, 0);
+ rc = xsub_t::xsend (&msg, 0);
if (rc != 0)
err = errno;
- zmq_msg_close (&msg);
+ int rc2 = msg.close ();
+ errno_assert (rc2 == 0);
if (rc != 0)
errno = err;
return rc;
}
-int zmq::sub_t::xsend (zmq_msg_t *msg_, int options_)
+int zmq::sub_t::xsend (msg_t *msg_, int options_)
{
// Overload the XSUB's send.
errno = ENOTSUP;
diff --git a/src/sub.hpp b/src/sub.hpp
index d1f467d..8575961 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -36,7 +36,7 @@ namespace zmq
protected:
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
- int xsend (zmq_msg_t *msg_, int options_);
+ int xsend (class msg_t *msg_, int options_);
bool xhas_out ();
private:
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 0c1581d..f00d478 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -22,8 +22,6 @@
#include <string>
-#include "../include/zmq.h"
-
#include "tcp_connecter.hpp"
#include "platform.hpp"
#include "ip.hpp"
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 8de564f..f40b0fe 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -20,8 +20,6 @@
#include <string.h>
-#include "../include/zmq.h"
-
#include "tcp_listener.hpp"
#include "platform.hpp"
#include "ip.hpp"
diff --git a/src/xpub.cpp b/src/xpub.cpp
index ed56183..2b5c4eb 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -18,11 +18,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "xpub.hpp"
-#include "err.hpp"
#include "pipe.hpp"
+#include "err.hpp"
+#include "msg.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
@@ -53,7 +52,7 @@ void zmq::xpub_t::process_term (int linger_)
socket_base_t::process_term (linger_);
}
-int zmq::xpub_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
return dist.send (msg_, flags_);
}
@@ -63,7 +62,7 @@ bool zmq::xpub_t::xhas_out ()
return dist.has_out ();
}
-int zmq::xpub_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::xpub_t::xrecv (msg_t *msg_, int flags_)
{
errno = EAGAIN;
return -1;
diff --git a/src/xpub.hpp b/src/xpub.hpp
index e945198..19aa38a 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -39,9 +39,9 @@ namespace zmq
// Implementations of virtual functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- int xsend (zmq_msg_t *msg_, int flags_);
+ int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
- int xrecv (zmq_msg_t *msg_, int flags_);
+ int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
private:
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 75dc30e..5e01e2f 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -18,11 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "xrep.hpp"
-#include "err.hpp"
#include "pipe.hpp"
+#include "err.hpp"
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
@@ -159,7 +157,7 @@ void zmq::xrep_t::activated (writer_t *pipe_)
zmq_assert (false);
}
-int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
{
// If this is the first part of the message it's the identity of the
// peer to send the message to.
@@ -168,44 +166,43 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
- if (msg_->flags & ZMQ_MSG_MORE) {
+ if (msg_->flags () & msg_t::more) {
more_out = true;
// Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message.
- blob_t identity ((unsigned char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
+ blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) {
current_out = it->second.writer;
- zmq_msg_t empty;
- int rc = zmq_msg_init (&empty);
- zmq_assert (rc == 0);
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
if (!current_out->check_write (&empty)) {
it->second.active = false;
more_out = false;
current_out = NULL;
- rc = zmq_msg_close (&empty);
- zmq_assert (rc == 0);
+ rc = empty.close ();
+ errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}
- rc = zmq_msg_close (&empty);
- zmq_assert (rc == 0);
+ rc = empty.close ();
+ errno_assert (rc == 0);
}
}
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
return 0;
}
// Check whether this is the last part of the message.
- more_out = msg_->flags & ZMQ_MSG_MORE;
+ more_out = msg_->flags () & msg_t::more;
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
@@ -217,36 +214,38 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
}
}
else {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
}
// Detach the message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
return 0;
}
-int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
{
// If there is a prefetched message, return it.
if (prefetched) {
- zmq_msg_move (msg_, &prefetched_msg);
- more_in = msg_->flags & ZMQ_MSG_MORE;
+ int rc = msg_->move (prefetched_msg);
+ errno_assert (rc == 0);
+ more_in = msg_->flags () & msg_t::more;
prefetched = false;
return 0;
}
// Deallocate old content of the message.
- zmq_msg_close (msg_);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
// If we are in the middle of reading a message, just grab next part of it.
if (more_in) {
zmq_assert (inpipes [current_in].active);
bool fetched = inpipes [current_in].reader->read (msg_);
zmq_assert (fetched);
- more_in = msg_->flags & ZMQ_MSG_MORE;
+ more_in = msg_->flags () & msg_t::more;
if (!more_in) {
current_in++;
if (current_in >= inpipes.size ())
@@ -264,12 +263,11 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
// If we have a message, create a prefix and return it to the caller.
if (prefetched) {
- int rc = zmq_msg_init_size (msg_,
- inpipes [current_in].identity.size ());
- zmq_assert (rc == 0);
- memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (),
- zmq_msg_size (msg_));
- msg_->flags |= ZMQ_MSG_MORE;
+ int rc = msg_->init_size (inpipes [current_in].identity.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), inpipes [current_in].identity.data (),
+ msg_->size ());
+ msg_->set_flags (msg_t::more);
return 0;
}
@@ -283,7 +281,8 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
// No message is available. Initialise the output parameter
// to be a 0-byte message.
- zmq_msg_init (msg_);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index d7fbe9f..1c45655 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -27,6 +27,7 @@
#include "socket_base.hpp"
#include "blob.hpp"
#include "pipe.hpp"
+#include "msg.hpp"
namespace zmq
{
@@ -45,8 +46,8 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
const blob_t &peer_identity_);
- int xsend (zmq_msg_t *msg_, int flags_);
- int xrecv (zmq_msg_t *msg_, int flags_);
+ int xsend (class msg_t *msg_, int flags_);
+ int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
@@ -82,7 +83,7 @@ namespace zmq
bool prefetched;
// Holds the prefetched message.
- zmq_msg_t prefetched_msg;
+ msg_t prefetched_msg;
// If true, more incoming message parts are expected.
bool more_in;
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 96f1bba..2fda2c1 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -18,10 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "xreq.hpp"
#include "err.hpp"
+#include "msg.hpp"
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
@@ -52,12 +51,12 @@ void zmq::xreq_t::process_term (int linger_)
socket_base_t::process_term (linger_);
}
-int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::xreq_t::xsend (msg_t *msg_, int flags_)
{
return lb.send (msg_, flags_);
}
-int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
{
return fq.recv (msg_, flags_);
}
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 73af21f..e0cafe5 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -41,8 +41,8 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- int xsend (zmq_msg_t *msg_, int flags_);
- int xrecv (zmq_msg_t *msg_, int flags_);
+ int xsend (class msg_t *msg_, int flags_);
+ int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/xsub.cpp b/src/xsub.cpp
index b0c5795..b0e8cd2 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -20,8 +20,6 @@
#include <string.h>
-#include "../include/zmq.h"
-
#include "xsub.hpp"
#include "err.hpp"
@@ -34,12 +32,14 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
options.type = ZMQ_XSUB;
options.requires_in = true;
options.requires_out = false;
- zmq_msg_init (&message);
+ int rc = message.init ();
+ errno_assert (rc == 0);
}
zmq::xsub_t::~xsub_t ()
{
- zmq_msg_close (&message);
+ int rc = message.close ();
+ errno_assert (rc == 0);
}
void zmq::xsub_t::xattach_pipes (class reader_t *inpipe_,
@@ -55,10 +55,10 @@ void zmq::xsub_t::process_term (int linger_)
socket_base_t::process_term (linger_);
}
-int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_)
+int zmq::xsub_t::xsend (msg_t *msg_, int options_)
{
- size_t size = zmq_msg_size (msg_);
- unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
+ size_t size = msg_->size ();
+ unsigned char *data = (unsigned char*) msg_->data ();
// Malformed subscriptions are dropped silently.
if (size >= 1) {
@@ -72,10 +72,10 @@ int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_)
subscriptions.rm (data + 1, size - 1);
}
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
return 0;
}
@@ -85,14 +85,15 @@ bool zmq::xsub_t::xhas_out ()
return true;
}
-int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
{
// If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead.
if (has_message) {
- zmq_msg_move (msg_, &message);
+ int rc = msg_->move (message);
+ errno_assert (rc == 0);
has_message = false;
- more = msg_->flags & ZMQ_MSG_MORE;
+ more = msg_->flags () & msg_t::more;
return 0;
}
@@ -112,13 +113,13 @@ int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || match (msg_)) {
- more = msg_->flags & ZMQ_MSG_MORE;
+ more = msg_->flags () & msg_t::more;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (msg_->flags & ZMQ_MSG_MORE) {
+ while (msg_->flags () & msg_t::more) {
rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
@@ -158,15 +159,15 @@ bool zmq::xsub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (message.flags & ZMQ_MSG_MORE) {
+ while (message.flags () & msg_t::more) {
rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
}
}
-bool zmq::xsub_t::match (zmq_msg_t *msg_)
+bool zmq::xsub_t::match (msg_t *msg_)
{
- return subscriptions.check ((unsigned char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
+ return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
}
+
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 6bd55ad..202a29f 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -21,10 +21,9 @@
#ifndef __ZMQ_XSUB_HPP_INCLUDED__
#define __ZMQ_XSUB_HPP_INCLUDED__
-#include "../include/zmq.h"
-
#include "trie.hpp"
#include "socket_base.hpp"
+#include "msg.hpp"
#include "fq.hpp"
namespace zmq
@@ -42,9 +41,9 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- int xsend (zmq_msg_t *msg_, int options_);
+ int xsend (class msg_t *msg_, int options_);
bool xhas_out ();
- int xrecv (zmq_msg_t *msg_, int flags_);
+ int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
private:
@@ -53,7 +52,7 @@ namespace zmq
void process_term (int linger_);
// Check whether the message matches at least one subscription.
- bool match (zmq_msg_t *msg_);
+ bool match (class msg_t *msg_);
// Fair queueing object for inbound pipes.
fq_t fq;
@@ -64,7 +63,7 @@ namespace zmq
// If true, 'message' contains a matching message to return on the
// next recv call.
bool has_message;
- zmq_msg_t message;
+ msg_t message;
// If true, part of a multipart message was already received, but
// there are following parts still waiting.
diff --git a/src/zmq.cpp b/src/zmq.cpp
index eb8cc40..b40d8b2 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -32,8 +32,6 @@
#include <poll.h>
#endif
-#include "../include/zmq.h"
-
#include <string.h>
#include <errno.h>
#include <stdlib.h>
@@ -46,6 +44,7 @@
#include "clock.hpp"
#include "ctx.hpp"
#include "err.hpp"
+#include "msg.hpp"
#include "fd.hpp"
#if !defined ZMQ_HAVE_WINDOWS
@@ -57,6 +56,10 @@
#include <pgm/pgm.h>
#endif
+// Compile time check whether msg_t fits into zmq_msg_t.
+typedef char check_msg_t_size
+ [sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
+
void zmq_version (int *major_, int *minor_, int *patch_)
{
*major_ = ZMQ_VERSION_MAJOR;
@@ -260,7 +263,7 @@ int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
return -1;
}
int sz = (int) zmq_msg_size (msg_);
- int rc = (((zmq::socket_base_t*) s_)->send (msg_, flags_));
+ int rc = (((zmq::socket_base_t*) s_)->send ((zmq::msg_t*) msg_, flags_));
if (unlikely (rc < 0))
return -1;
return sz;
@@ -272,12 +275,53 @@ int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
errno = ENOTSOCK;
return -1;
}
- int rc = (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
+ int rc = (((zmq::socket_base_t*) s_)->recv ((zmq::msg_t*) msg_, flags_));
if (unlikely (rc < 0))
return -1;
return (int) zmq_msg_size (msg_);
}
+int zmq_msg_init (zmq_msg_t *msg_)
+{
+ return ((zmq::msg_t*) msg_)->init ();
+}
+
+int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
+{
+ return ((zmq::msg_t*) msg_)->init_size (size_);
+}
+
+int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
+ zmq_free_fn *ffn_, void *hint_)
+{
+ return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
+}
+
+int zmq_msg_close (zmq_msg_t *msg_)
+{
+ return ((zmq::msg_t*) msg_)->close ();
+}
+
+int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
+{
+ return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
+}
+
+int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
+{
+ return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
+}
+
+void *zmq_msg_data (zmq_msg_t *msg_)
+{
+ return ((zmq::msg_t*) msg_)->data ();
+}
+
+size_t zmq_msg_size (zmq_msg_t *msg_)
+{
+ return ((zmq::msg_t*) msg_)->size ();
+}
+
#if defined ZMQ_FORCE_SELECT
#define ZMQ_POLL_BASED_ON_SELECT
#elif defined ZMQ_FORCE_POLL
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index 5ca2367..ca7c66d 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -53,28 +53,27 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
// Create a list of props to send.
-
- zmq_msg_t msg;
- int rc = zmq_msg_init_size (&msg, 4);
+ msg_t msg;
+ int rc = msg.init_size (4);
errno_assert (rc == 0);
- unsigned char *data = (unsigned char*) zmq_msg_data (&msg);
+ unsigned char *data = (unsigned char*) msg.data ();
put_uint16 (data, prop_type);
put_uint16 (data + 2, options.type);
- msg.flags |= ZMQ_MSG_MORE;
+ msg.set_flags (msg_t::more);
to_send.push_back (msg);
if (!options.identity.empty ()) {
- rc = zmq_msg_init_size (&msg, 2 + options.identity.size ());
+ rc = msg.init_size (2 + options.identity.size ());
errno_assert (rc == 0);
- data = (unsigned char*) zmq_msg_data (&msg);
+ data = (unsigned char*) msg.data ();
put_uint16 (data, prop_identity);
memcpy (data + 2, options.identity.data (), options.identity.size ());
- msg.flags |= ZMQ_MSG_MORE;
+ msg.set_flags (msg_t::more);
to_send.push_back (msg);
}
// Remove the MORE flag from the last prop.
- to_send.back ().flags &= ~ZMQ_MSG_MORE;
+ to_send.back ().reset_flags (msg_t::more);
}
zmq::zmq_init_t::~zmq_init_t ()
@@ -85,13 +84,13 @@ zmq::zmq_init_t::~zmq_init_t ()
// If there are unsent props still queued deallocate them.
for (to_send_t::iterator it = to_send.begin (); it != to_send.end ();
++it) {
- int rc = zmq_msg_close (&(*it));
+ int rc = it->close ();
errno_assert (rc == 0);
}
to_send.clear ();
}
-bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
+bool zmq::zmq_init_t::read (msg_t *msg_)
{
// If the identity was already sent, do nothing.
if (to_send.empty ())
@@ -107,15 +106,15 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
return true;
}
-bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
+bool zmq::zmq_init_t::write (msg_t *msg_)
{
// If identity was already received, we are not interested
// in subsequent messages.
if (received)
return false;
- size_t size = zmq_msg_size (msg_);
- unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
+ size_t size = msg_->size ();
+ unsigned char *data = (unsigned char*) msg_->data ();
// There should be at least property type in the message.
zmq_assert (size >= 2);
@@ -139,7 +138,7 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
zmq_assert (false);
}
- if (!(msg_->flags & ZMQ_MSG_MORE)) {
+ if (!(msg_->flags () & msg_t::more)) {
received = true;
finalise_initialisation ();
}
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 92ab05b..ec9b2b3 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -23,14 +23,13 @@
#include <vector>
-#include "../include/zmq.h"
-
#include "i_inout.hpp"
#include "i_engine.hpp"
-#include "own.hpp"
-#include "fd.hpp"
#include "stdint.hpp"
#include "blob.hpp"
+#include "msg.hpp"
+#include "own.hpp"
+#include "fd.hpp"
namespace zmq
{
@@ -58,8 +57,8 @@ namespace zmq
void dispatch_engine ();
// i_inout interface implementation.
- bool read (::zmq_msg_t *msg_);
- bool write (::zmq_msg_t *msg_);
+ bool read (class msg_t *msg_);
+ bool write (class msg_t *msg_);
void flush ();
void detach ();
@@ -75,7 +74,7 @@ namespace zmq
// List of messages to send to the peer during the connection
// initiation phase.
- typedef std::vector < ::zmq_msg_t> to_send_t;
+ typedef std::vector <msg_t> to_send_t;
to_send_t to_send;
// True if peer's identity was already received.