summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/zmq.h29
-rw-r--r--src/config.hpp4
-rw-r--r--src/ctx.cpp14
-rw-r--r--src/ctx.hpp2
-rw-r--r--src/decoder.cpp18
-rw-r--r--src/decoder.hpp5
-rw-r--r--src/dist.cpp66
-rw-r--r--src/dist.hpp6
-rw-r--r--src/encoder.cpp24
-rw-r--r--src/encoder.hpp5
-rw-r--r--src/err.cpp2
-rw-r--r--src/err.hpp3
-rw-r--r--src/fq.cpp13
-rw-r--r--src/fq.hpp3
-rw-r--r--src/i_inout.hpp7
-rw-r--r--src/io_thread.cpp2
-rw-r--r--src/ip.cpp4
-rw-r--r--src/lb.cpp28
-rw-r--r--src/lb.hpp2
-rw-r--r--src/msg.cpp249
-rw-r--r--src/msg.hpp107
-rw-r--r--src/object.hpp2
-rw-r--r--src/options.cpp2
-rw-r--r--src/pair.cpp25
-rw-r--r--src/pair.hpp4
-rw-r--r--src/pipe.cpp49
-rw-r--r--src/pipe.hpp13
-rw-r--r--src/pub.cpp1
-rw-r--r--src/pull.cpp5
-rw-r--r--src/pull.hpp2
-rw-r--r--src/push.cpp7
-rw-r--r--src/push.hpp2
-rw-r--r--src/rep.cpp15
-rw-r--r--src/rep.hpp4
-rw-r--r--src/req.cpp23
-rw-r--r--src/req.hpp4
-rw-r--r--src/session.cpp17
-rw-r--r--src/session.hpp4
-rw-r--r--src/socket_base.cpp29
-rw-r--r--src/socket_base.hpp10
-rw-r--r--src/sub.cpp17
-rw-r--r--src/sub.hpp2
-rw-r--r--src/tcp_connecter.cpp2
-rw-r--r--src/tcp_listener.cpp2
-rw-r--r--src/xpub.cpp9
-rw-r--r--src/xpub.hpp4
-rw-r--r--src/xrep.cpp69
-rw-r--r--src/xrep.hpp7
-rw-r--r--src/xreq.cpp7
-rw-r--r--src/xreq.hpp4
-rw-r--r--src/xsub.cpp41
-rw-r--r--src/xsub.hpp11
-rw-r--r--src/zmq.cpp52
-rw-r--r--src/zmq_init.cpp29
-rw-r--r--src/zmq_init.hpp13
55 files changed, 606 insertions, 474 deletions
diff --git a/include/zmq.h b/include/zmq.h
index 70197c9..2d01e24 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -121,34 +121,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);
/* 0MQ message definition. */
/******************************************************************************/
-/* Maximal size of "Very Small Message". VSMs are passed by value */
-/* to avoid excessive memory allocation/deallocation. */
-/* If VMSs larger than 255 bytes are required, type of 'vsm_size' */
-/* field in zmq_msg_t structure should be modified accordingly. */
-#define ZMQ_MAX_VSM_SIZE 30
-
-/* Message types. These integers may be stored in 'content' member of the */
-/* message instead of regular pointer to the data. */
-#define ZMQ_DELIMITER 31
-#define ZMQ_VSM 32
-
-/* Message flags. ZMQ_MSG_SHARED is strictly speaking not a message flag */
-/* (it has no equivalent in the wire format), however, making it a flag */
-/* allows us to pack the stucture tigher and thus improve performance. */
-#define ZMQ_MSG_MORE 1
-#define ZMQ_MSG_SHARED 128
-#define ZMQ_MSG_MASK 129 /* Merges all the flags */
-
-/* A message. Note that 'content' is not a pointer to the raw data. */
-/* Rather it is pointer to zmq::msg_content_t structure */
-/* (see src/msg_content.hpp for its definition). */
-typedef struct
-{
- void *content;
- unsigned char flags;
- unsigned char vsm_size;
- unsigned char vsm_data [ZMQ_MAX_VSM_SIZE];
-} zmq_msg_t;
+typedef unsigned char zmq_msg_t [32];
typedef void (zmq_free_fn) (void *data, void *hint);
diff --git a/src/config.hpp b/src/config.hpp
index 3df66c7..dff3f87 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -36,6 +36,10 @@ namespace zmq
// memory allocation by approximately 99.6%
message_pipe_granularity = 256,
+ // Size in bytes of the largest message that is still copied around
+ // rather than being reference-counted.
+ max_vsm_size = 29,
+
// Determines how often does socket poll for new commands when it
// still has unprocessed messages to handle. Thus, if it is set to 100,
// socket will process 100 inbound messages before doing the poll.
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 2758729..fb5420d 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -26,8 +26,9 @@
#include "io_thread.hpp"
#include "platform.hpp"
#include "reaper.hpp"
-#include "err.hpp"
#include "pipe.hpp"
+#include "err.hpp"
+#include "msg.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.h"
@@ -304,10 +305,10 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
void zmq::ctx_t::log (const char *format_, va_list args_)
{
// Create the log message.
- zmq_msg_t msg;
- int rc = zmq_msg_init_size (&msg, strlen (format_) + 1);
- zmq_assert (rc == 0);
- memcpy (zmq_msg_data (&msg), format_, zmq_msg_size (&msg));
+ msg_t msg;
+ int rc = msg.init_size (strlen (format_) + 1);
+ errno_assert (rc == 0);
+ memcpy (msg.data (), format_, msg.size ());
// At this point we migrate the log socket to the current thread.
// We rely on mutex for executing the memory barrier.
@@ -316,7 +317,8 @@ void zmq::ctx_t::log (const char *format_, va_list args_)
log_socket->send (&msg, 0);
log_sync.unlock ();
- zmq_msg_close (&msg);
+ rc = msg.close ();
+ errno_assert (rc == 0);
}
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 33d5dad..7d865fa 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -26,8 +26,6 @@
#include <string>
#include <stdarg.h>
-#include "../include/zmq.h"
-
#include "mailbox.hpp"
#include "semaphore.hpp"
#include "ypipe.hpp"
diff --git a/src/decoder.cpp b/src/decoder.cpp
index efb39e8..bcf5974 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -31,7 +31,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
destination (NULL),
maxmsgsize (maxmsgsize_)
{
- zmq_msg_init (&in_progress);
+ int rc = in_progress.init ();
+ errno_assert (rc == 0);
// At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
@@ -39,7 +40,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
zmq::decoder_t::~decoder_t ()
{
- zmq_msg_close (&in_progress);
+ int rc = in_progress.close ();
+ errno_assert (rc == 0);
}
void zmq::decoder_t::set_inout (i_inout *destination_)
@@ -71,9 +73,9 @@ bool zmq::decoder_t::one_byte_size_ready ()
errno = ENOMEM;
}
else
- rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
+ rc = in_progress.init_size (*tmpbuf - 1);
if (rc != 0 && errno == ENOMEM) {
- rc = zmq_msg_init (&in_progress);
+ rc = in_progress.init ();
errno_assert (rc == 0);
decoding_error ();
return false;
@@ -106,9 +108,9 @@ bool zmq::decoder_t::eight_byte_size_ready ()
errno = ENOMEM;
}
else
- rc = zmq_msg_init_size (&in_progress, size - 1);
+ rc = in_progress.init_size (size - 1);
if (rc != 0 && errno == ENOMEM) {
- rc = zmq_msg_init (&in_progress);
+ rc = in_progress.init ();
errno_assert (rc == 0);
decoding_error ();
return false;
@@ -122,9 +124,9 @@ bool zmq::decoder_t::eight_byte_size_ready ()
bool zmq::decoder_t::flags_ready ()
{
// Store the flags from the wire into the message structure.
- in_progress.flags = tmpbuf [0];
+ in_progress.set_flags (tmpbuf [0]);
- next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
+ next_step (in_progress.data (), in_progress.size (),
&decoder_t::message_ready);
return true;
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 23806a3..114ecef 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -27,10 +27,9 @@
#include <algorithm>
#include "err.hpp"
+#include "msg.hpp"
#include "stdint.hpp"
-#include "../include/zmq.h"
-
namespace zmq
{
@@ -196,7 +195,7 @@ namespace zmq
struct i_inout *destination;
unsigned char tmpbuf [8];
- ::zmq_msg_t in_progress;
+ msg_t in_progress;
int64_t maxmsgsize;
diff --git a/src/dist.cpp b/src/dist.cpp
index 9d50368..093da79 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -18,8 +18,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "dist.hpp"
#include "pipe.hpp"
#include "err.hpp"
@@ -89,10 +87,10 @@ void zmq::dist_t::activated (writer_t *pipe_)
active++;
}
-int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
+int zmq::dist_t::send (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
- bool msg_more = msg_->flags & ZMQ_MSG_MORE;
+ bool msg_more = msg_->flags () & msg_t::more;
// Push the message to active pipes.
distribute (msg_, flags_);
@@ -106,63 +104,33 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
return 0;
}
-void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_)
+void zmq::dist_t::distribute (msg_t *msg_, int flags_)
{
// If there are no active pipes available, simply drop the message.
if (active == 0) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return;
- }
-
- msg_content_t *content = (msg_content_t*) msg_->content;
-
- // For VSMs the copying is straighforward.
- if (content == (msg_content_t*) ZMQ_VSM) {
- for (pipes_t::size_type i = 0; i < active;)
- if (write (pipes [i], msg_))
- i++;
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return;
- }
-
- // Optimisation for the case when there's only a single pipe
- // to send the message to - no refcount adjustment i.e. no atomic
- // operations are needed.
- if (active == 1) {
- if (!write (pipes [0], msg_)) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- }
- int rc = zmq_msg_init (msg_);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
zmq_assert (rc == 0);
return;
}
- // There are at least 2 destinations for the message. That means we have
- // to deal with reference counting. First add N-1 references to
- // the content (we are holding one reference anyway, that's why -1).
- if (msg_->flags & ZMQ_MSG_SHARED)
- content->refcnt.add (active - 1);
- else {
- content->refcnt.set (active);
- msg_->flags |= ZMQ_MSG_SHARED;
- }
+ // Add active-1 references to the message. We already hold one reference,
+ // that's why -1.
+ msg_->add_refs (active - 1);
- // Push the message to all destinations.
+ // Push copy of the message to each active pipe.
for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
- content->refcnt.sub (1);
+ msg_->rm_refs (1);
else
i++;
}
- // Detach the original message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ // Detach the original message from the data buffer. Note that we don't
+ // close the message. That's because we've already used all the references.
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
}
bool zmq::dist_t::has_out ()
@@ -170,14 +138,14 @@ bool zmq::dist_t::has_out ()
return true;
}
-bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
+bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_)
{
if (!pipe_->write (msg_)) {
active--;
pipes.swap (pipes.index (pipe_), active);
return false;
}
- if (!(msg_->flags & ZMQ_MSG_MORE))
+ if (!(msg_->flags () & msg_t::more))
pipe_->flush ();
return true;
}
diff --git a/src/dist.hpp b/src/dist.hpp
index ad9767a..ea05305 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -40,7 +40,7 @@ namespace zmq
void attach (writer_t *pipe_);
void terminate ();
- int send (zmq_msg_t *msg_, int flags_);
+ int send (class msg_t *msg_, int flags_);
bool has_out ();
// i_writer_events interface implementation.
@@ -51,10 +51,10 @@ namespace zmq
// Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned.
- bool write (class writer_t *pipe_, zmq_msg_t *msg_);
+ bool write (class writer_t *pipe_, class msg_t *msg_);
// Put the message to all active pipes.
- void distribute (zmq_msg_t *msg_, int flags_);
+ void distribute (class msg_t *msg_, int flags_);
// Plug in all the delayed pipes.
void clear_new_pipes ();
diff --git a/src/encoder.cpp b/src/encoder.cpp
index 88e1dff..a42f06f 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -26,7 +26,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) :
encoder_base_t <encoder_t> (bufsize_),
source (NULL)
{
- zmq_msg_init (&in_progress);
+ int rc = in_progress.init ();
+ errno_assert (rc == 0);
// Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &encoder_t::message_ready, true);
@@ -34,7 +35,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) :
zmq::encoder_t::~encoder_t ()
{
- zmq_msg_close (&in_progress);
+ int rc = in_progress.close ();
+ errno_assert (rc == 0);
}
void zmq::encoder_t::set_inout (i_inout *source_)
@@ -45,7 +47,7 @@ void zmq::encoder_t::set_inout (i_inout *source_)
bool zmq::encoder_t::size_ready ()
{
// Write message body into the buffer.
- next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
+ next_step (in_progress.data (), in_progress.size (),
&encoder_t::message_ready, false);
return true;
}
@@ -53,19 +55,21 @@ bool zmq::encoder_t::size_ready ()
bool zmq::encoder_t::message_ready ()
{
// Destroy content of the old message.
- zmq_msg_close (&in_progress);
+ int rc = in_progress.close ();
+ errno_assert (rc == 0);
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
if (!source || !source->read (&in_progress)) {
- zmq_msg_init (&in_progress);
+ rc = in_progress.init ();
+ errno_assert (rc == 0);
return false;
}
// Get the message size.
- size_t size = zmq_msg_size (&in_progress);
+ size_t size = in_progress.size ();
// Account for the 'flags' byte.
size++;
@@ -75,16 +79,16 @@ bool zmq::encoder_t::message_ready ()
// message size. In both cases 'flags' field follows.
if (size < 255) {
tmpbuf [0] = (unsigned char) size;
- tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED);
+ tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 2, &encoder_t::size_ready,
- !(in_progress.flags & ZMQ_MSG_MORE));
+ !(in_progress.flags () & msg_t::more));
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
- tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED);
+ tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 10, &encoder_t::size_ready,
- !(in_progress.flags & ZMQ_MSG_MORE));
+ !(in_progress.flags () & msg_t::more));
}
return true;
}
diff --git a/src/encoder.hpp b/src/encoder.hpp
index 918ec2b..617b65b 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -27,8 +27,7 @@
#include <algorithm>
#include "err.hpp"
-
-#include "../include/zmq.h"
+#include "msg.hpp"
namespace zmq
{
@@ -172,7 +171,7 @@ namespace zmq
bool message_ready ();
struct i_inout *source;
- ::zmq_msg_t in_progress;
+ msg_t in_progress;
unsigned char tmpbuf [10];
encoder_t (const encoder_t&);
diff --git a/src/err.cpp b/src/err.cpp
index 8761c22..87a0006 100644
--- a/src/err.cpp
+++ b/src/err.cpp
@@ -18,8 +18,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "err.hpp"
#include "platform.hpp"
diff --git a/src/err.hpp b/src/err.hpp
index 3ffd99d..6289a08 100644
--- a/src/err.hpp
+++ b/src/err.hpp
@@ -21,6 +21,9 @@
#ifndef __ZMQ_ERR_HPP_INCLUDED__
#define __ZMQ_ERR_HPP_INCLUDED__
+// 0MQ-specific error codes are defined in zmq.h
+#include "../include/zmq.h"
+
#include <assert.h>
#include <errno.h>
#include <string.h>
diff --git a/src/fq.cpp b/src/fq.cpp
index 36fd435..20dc769 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -18,12 +18,11 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "fq.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "own.hpp"
+#include "msg.hpp"
zmq::fq_t::fq_t (own_t *sink_) :
active (0),
@@ -95,10 +94,11 @@ void zmq::fq_t::activated (reader_t *pipe_)
active++;
}
-int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
+int zmq::fq_t::recv (msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
- zmq_msg_close (msg_);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
// Round-robin over the pipes to get the next message.
for (int count = active; count != 0; count--) {
@@ -116,7 +116,7 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
// and replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer.
if (fetched) {
- more = msg_->flags & ZMQ_MSG_MORE;
+ more = msg_->flags () & msg_t::more;
if (!more) {
current++;
if (current >= active)
@@ -134,7 +134,8 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
// No message is available. Initialise the output parameter
// to be a 0-byte message.
- zmq_msg_init (msg_);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}
diff --git a/src/fq.hpp b/src/fq.hpp
index 8c6c95c..c35d458 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -23,6 +23,7 @@
#include "array.hpp"
#include "pipe.hpp"
+#include "msg.hpp"
namespace zmq
{
@@ -40,7 +41,7 @@ namespace zmq
void attach (reader_t *pipe_);
void terminate ();
- int recv (zmq_msg_t *msg_, int flags_);
+ int recv (msg_t *msg_, int flags_);
bool has_in ();
// i_reader_events implementation.
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
index 057b46c..3f8e8e0 100644
--- a/src/i_inout.hpp
+++ b/src/i_inout.hpp
@@ -21,8 +21,7 @@
#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__
#define __ZMQ_I_INOUT_HPP_INCLUDED__
-#include "../include/zmq.h"
-
+#include "msg.hpp"
#include "stdint.hpp"
namespace zmq
@@ -33,10 +32,10 @@ namespace zmq
virtual ~i_inout () {}
// Engine asks for a message to send to the network.
- virtual bool read (::zmq_msg_t *msg_) = 0;
+ virtual bool read (msg_t *msg_) = 0;
// Engine received message from the network and sends it further on.
- virtual bool write (::zmq_msg_t *msg_) = 0;
+ virtual bool write (msg_t *msg_) = 0;
// Flush all the previously written messages.
virtual void flush () = 0;
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index be52bdd..9678392 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -20,8 +20,6 @@
#include <new>
-#include "../include/zmq.h"
-
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
diff --git a/src/ip.cpp b/src/ip.cpp
index a63a97d..3591f02 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -23,11 +23,9 @@
#include <stdlib.h>
#include <string>
-#include "../include/zmq.h"
-
#include "ip.hpp"
-#include "platform.hpp"
#include "err.hpp"
+#include "platform.hpp"
#include "stdint.hpp"
#if defined ZMQ_HAVE_SOLARIS
diff --git a/src/lb.cpp b/src/lb.cpp
index 95af4a1..e81df3a 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -18,12 +18,11 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "lb.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "own.hpp"
+#include "msg.hpp"
zmq::lb_t::lb_t (own_t *sink_) :
active (0),
@@ -93,26 +92,26 @@ void zmq::lb_t::activated (writer_t *pipe_)
active++;
}
-int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
+int zmq::lb_t::send (msg_t *msg_, int flags_)
{
// Drop the message if required. If we are at the end of the message
// switch back to non-dropping mode.
if (dropping) {
- more = msg_->flags & ZMQ_MSG_MORE;
+ more = msg_->flags () & msg_t::more;
if (!more)
dropping = false;
- int rc = zmq_msg_close (msg_);
+ int rc = msg_->close ();
errno_assert (rc == 0);
- rc = zmq_msg_init (msg_);