diff options
55 files changed, 606 insertions, 474 deletions
diff --git a/include/zmq.h b/include/zmq.h index 70197c9..2d01e24 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -121,34 +121,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum); /* 0MQ message definition. */ /******************************************************************************/ -/* Maximal size of "Very Small Message". VSMs are passed by value */ -/* to avoid excessive memory allocation/deallocation. */ -/* If VMSs larger than 255 bytes are required, type of 'vsm_size' */ -/* field in zmq_msg_t structure should be modified accordingly. */ -#define ZMQ_MAX_VSM_SIZE 30 - -/* Message types. These integers may be stored in 'content' member of the */ -/* message instead of regular pointer to the data. */ -#define ZMQ_DELIMITER 31 -#define ZMQ_VSM 32 - -/* Message flags. ZMQ_MSG_SHARED is strictly speaking not a message flag */ -/* (it has no equivalent in the wire format), however, making it a flag */ -/* allows us to pack the stucture tigher and thus improve performance. */ -#define ZMQ_MSG_MORE 1 -#define ZMQ_MSG_SHARED 128 -#define ZMQ_MSG_MASK 129 /* Merges all the flags */ - -/* A message. Note that 'content' is not a pointer to the raw data. */ -/* Rather it is pointer to zmq::msg_content_t structure */ -/* (see src/msg_content.hpp for its definition). */ -typedef struct -{ - void *content; - unsigned char flags; - unsigned char vsm_size; - unsigned char vsm_data [ZMQ_MAX_VSM_SIZE]; -} zmq_msg_t; +typedef unsigned char zmq_msg_t [32]; typedef void (zmq_free_fn) (void *data, void *hint); diff --git a/src/config.hpp b/src/config.hpp index 3df66c7..dff3f87 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -36,6 +36,10 @@ namespace zmq // memory allocation by approximately 99.6% message_pipe_granularity = 256, + // Size in bytes of the largest message that is still copied around + // rather than being reference-counted. + max_vsm_size = 29, + // Determines how often does socket poll for new commands when it // still has unprocessed messages to handle. Thus, if it is set to 100, // socket will process 100 inbound messages before doing the poll. diff --git a/src/ctx.cpp b/src/ctx.cpp index 2758729..fb5420d 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -26,8 +26,9 @@ #include "io_thread.hpp" #include "platform.hpp" #include "reaper.hpp" -#include "err.hpp" #include "pipe.hpp" +#include "err.hpp" +#include "msg.hpp" #if defined ZMQ_HAVE_WINDOWS #include "windows.h" @@ -304,10 +305,10 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) void zmq::ctx_t::log (const char *format_, va_list args_) { // Create the log message. - zmq_msg_t msg; - int rc = zmq_msg_init_size (&msg, strlen (format_) + 1); - zmq_assert (rc == 0); - memcpy (zmq_msg_data (&msg), format_, zmq_msg_size (&msg)); + msg_t msg; + int rc = msg.init_size (strlen (format_) + 1); + errno_assert (rc == 0); + memcpy (msg.data (), format_, msg.size ()); // At this point we migrate the log socket to the current thread. // We rely on mutex for executing the memory barrier. @@ -316,7 +317,8 @@ void zmq::ctx_t::log (const char *format_, va_list args_) log_socket->send (&msg, 0); log_sync.unlock (); - zmq_msg_close (&msg); + rc = msg.close (); + errno_assert (rc == 0); } diff --git a/src/ctx.hpp b/src/ctx.hpp index 33d5dad..7d865fa 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -26,8 +26,6 @@ #include <string> #include <stdarg.h> -#include "../include/zmq.h" - #include "mailbox.hpp" #include "semaphore.hpp" #include "ypipe.hpp" diff --git a/src/decoder.cpp b/src/decoder.cpp index efb39e8..bcf5974 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -31,7 +31,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : destination (NULL), maxmsgsize (maxmsgsize_) { - zmq_msg_init (&in_progress); + int rc = in_progress.init (); + errno_assert (rc == 0); // At the beginning, read one byte and go to one_byte_size_ready state. next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); @@ -39,7 +40,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : zmq::decoder_t::~decoder_t () { - zmq_msg_close (&in_progress); + int rc = in_progress.close (); + errno_assert (rc == 0); } void zmq::decoder_t::set_inout (i_inout *destination_) @@ -71,9 +73,9 @@ bool zmq::decoder_t::one_byte_size_ready () errno = ENOMEM; } else - rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); + rc = in_progress.init_size (*tmpbuf - 1); if (rc != 0 && errno == ENOMEM) { - rc = zmq_msg_init (&in_progress); + rc = in_progress.init (); errno_assert (rc == 0); decoding_error (); return false; @@ -106,9 +108,9 @@ bool zmq::decoder_t::eight_byte_size_ready () errno = ENOMEM; } else - rc = zmq_msg_init_size (&in_progress, size - 1); + rc = in_progress.init_size (size - 1); if (rc != 0 && errno == ENOMEM) { - rc = zmq_msg_init (&in_progress); + rc = in_progress.init (); errno_assert (rc == 0); decoding_error (); return false; @@ -122,9 +124,9 @@ bool zmq::decoder_t::eight_byte_size_ready () bool zmq::decoder_t::flags_ready () { // Store the flags from the wire into the message structure. - in_progress.flags = tmpbuf [0]; + in_progress.set_flags (tmpbuf [0]); - next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), + next_step (in_progress.data (), in_progress.size (), &decoder_t::message_ready); return true; diff --git a/src/decoder.hpp b/src/decoder.hpp index 23806a3..114ecef 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -27,10 +27,9 @@ #include <algorithm> #include "err.hpp" +#include "msg.hpp" #include "stdint.hpp" -#include "../include/zmq.h" - namespace zmq { @@ -196,7 +195,7 @@ namespace zmq struct i_inout *destination; unsigned char tmpbuf [8]; - ::zmq_msg_t in_progress; + msg_t in_progress; int64_t maxmsgsize; diff --git a/src/dist.cpp b/src/dist.cpp index 9d50368..093da79 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -18,8 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "dist.hpp" #include "pipe.hpp" #include "err.hpp" @@ -89,10 +87,10 @@ void zmq::dist_t::activated (writer_t *pipe_) active++; } -int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) +int zmq::dist_t::send (msg_t *msg_, int flags_) { // Is this end of a multipart message? - bool msg_more = msg_->flags & ZMQ_MSG_MORE; + bool msg_more = msg_->flags () & msg_t::more; // Push the message to active pipes. distribute (msg_, flags_); @@ -106,63 +104,33 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) return 0; } -void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_) +void zmq::dist_t::distribute (msg_t *msg_, int flags_) { // If there are no active pipes available, simply drop the message. if (active == 0) { - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return; - } - - msg_content_t *content = (msg_content_t*) msg_->content; - - // For VSMs the copying is straighforward. - if (content == (msg_content_t*) ZMQ_VSM) { - for (pipes_t::size_type i = 0; i < active;) - if (write (pipes [i], msg_)) - i++; - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return; - } - - // Optimisation for the case when there's only a single pipe - // to send the message to - no refcount adjustment i.e. no atomic - // operations are needed. - if (active == 1) { - if (!write (pipes [0], msg_)) { - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - } - int rc = zmq_msg_init (msg_); + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); zmq_assert (rc == 0); return; } - // There are at least 2 destinations for the message. That means we have - // to deal with reference counting. First add N-1 references to - // the content (we are holding one reference anyway, that's why -1). - if (msg_->flags & ZMQ_MSG_SHARED) - content->refcnt.add (active - 1); - else { - content->refcnt.set (active); - msg_->flags |= ZMQ_MSG_SHARED; - } + // Add active-1 references to the message. We already hold one reference, + // that's why -1. + msg_->add_refs (active - 1); - // Push the message to all destinations. + // Push copy of the message to each active pipe. for (pipes_t::size_type i = 0; i < active;) { if (!write (pipes [i], msg_)) - content->refcnt.sub (1); + msg_->rm_refs (1); else i++; } - // Detach the original message from the data buffer. - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); + // Detach the original message from the data buffer. Note that we don't + // close the message. That's because we've already used all the references. + int rc = msg_->init (); + errno_assert (rc == 0); } bool zmq::dist_t::has_out () @@ -170,14 +138,14 @@ bool zmq::dist_t::has_out () return true; } -bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) +bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { active--; pipes.swap (pipes.index (pipe_), active); return false; } - if (!(msg_->flags & ZMQ_MSG_MORE)) + if (!(msg_->flags () & msg_t::more)) pipe_->flush (); return true; } diff --git a/src/dist.hpp b/src/dist.hpp index ad9767a..ea05305 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -40,7 +40,7 @@ namespace zmq void attach (writer_t *pipe_); void terminate (); - int send (zmq_msg_t *msg_, int flags_); + int send (class msg_t *msg_, int flags_); bool has_out (); // i_writer_events interface implementation. @@ -51,10 +51,10 @@ namespace zmq // Write the message to the pipe. Make the pipe inactive if writing // fails. In such a case false is returned. - bool write (class writer_t *pipe_, zmq_msg_t *msg_); + bool write (class writer_t *pipe_, class msg_t *msg_); // Put the message to all active pipes. - void distribute (zmq_msg_t *msg_, int flags_); + void distribute (class msg_t *msg_, int flags_); // Plug in all the delayed pipes. void clear_new_pipes (); diff --git a/src/encoder.cpp b/src/encoder.cpp index 88e1dff..a42f06f 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -26,7 +26,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) : encoder_base_t <encoder_t> (bufsize_), source (NULL) { - zmq_msg_init (&in_progress); + int rc = in_progress.init (); + errno_assert (rc == 0); // Write 0 bytes to the batch and go to message_ready state. next_step (NULL, 0, &encoder_t::message_ready, true); @@ -34,7 +35,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) : zmq::encoder_t::~encoder_t () { - zmq_msg_close (&in_progress); + int rc = in_progress.close (); + errno_assert (rc == 0); } void zmq::encoder_t::set_inout (i_inout *source_) @@ -45,7 +47,7 @@ void zmq::encoder_t::set_inout (i_inout *source_) bool zmq::encoder_t::size_ready () { // Write message body into the buffer. - next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), + next_step (in_progress.data (), in_progress.size (), &encoder_t::message_ready, false); return true; } @@ -53,19 +55,21 @@ bool zmq::encoder_t::size_ready () bool zmq::encoder_t::message_ready () { // Destroy content of the old message. - zmq_msg_close (&in_progress); + int rc = in_progress.close (); + errno_assert (rc == 0); // Read new message. If there is none, return false. // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. if (!source || !source->read (&in_progress)) { - zmq_msg_init (&in_progress); + rc = in_progress.init (); + errno_assert (rc == 0); return false; } // Get the message size. - size_t size = zmq_msg_size (&in_progress); + size_t size = in_progress.size (); // Account for the 'flags' byte. size++; @@ -75,16 +79,16 @@ bool zmq::encoder_t::message_ready () // message size. In both cases 'flags' field follows. if (size < 255) { tmpbuf [0] = (unsigned char) size; - tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED); + tmpbuf [1] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 2, &encoder_t::size_ready, - !(in_progress.flags & ZMQ_MSG_MORE)); + !(in_progress.flags () & msg_t::more)); } else { tmpbuf [0] = 0xff; put_uint64 (tmpbuf + 1, size); - tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED); + tmpbuf [9] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 10, &encoder_t::size_ready, - !(in_progress.flags & ZMQ_MSG_MORE)); + !(in_progress.flags () & msg_t::more)); } return true; } diff --git a/src/encoder.hpp b/src/encoder.hpp index 918ec2b..617b65b 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -27,8 +27,7 @@ #include <algorithm> #include "err.hpp" - -#include "../include/zmq.h" +#include "msg.hpp" namespace zmq { @@ -172,7 +171,7 @@ namespace zmq bool message_ready (); struct i_inout *source; - ::zmq_msg_t in_progress; + msg_t in_progress; unsigned char tmpbuf [10]; encoder_t (const encoder_t&); diff --git a/src/err.cpp b/src/err.cpp index 8761c22..87a0006 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -18,8 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "err.hpp" #include "platform.hpp" diff --git a/src/err.hpp b/src/err.hpp index 3ffd99d..6289a08 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -21,6 +21,9 @@ #ifndef __ZMQ_ERR_HPP_INCLUDED__ #define __ZMQ_ERR_HPP_INCLUDED__ +// 0MQ-specific error codes are defined in zmq.h +#include "../include/zmq.h" + #include <assert.h> #include <errno.h> #include <string.h> @@ -18,12 +18,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "fq.hpp" #include "pipe.hpp" #include "err.hpp" #include "own.hpp" +#include "msg.hpp" zmq::fq_t::fq_t (own_t *sink_) : active (0), @@ -95,10 +94,11 @@ void zmq::fq_t::activated (reader_t *pipe_) active++; } -int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) +int zmq::fq_t::recv (msg_t *msg_, int flags_) { // Deallocate old content of the message. - zmq_msg_close (msg_); + int rc = msg_->close (); + errno_assert (rc == 0); // Round-robin over the pipes to get the next message. for (int count = active; count != 0; count--) { @@ -116,7 +116,7 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) // and replaced by another active pipe. Thus we don't have to increase // the 'current' pointer. if (fetched) { - more = msg_->flags & ZMQ_MSG_MORE; + more = msg_->flags () & msg_t::more; if (!more) { current++; if (current >= active) @@ -134,7 +134,8 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) // No message is available. Initialise the output parameter // to be a 0-byte message. - zmq_msg_init (msg_); + rc = msg_->init (); + errno_assert (rc == 0); errno = EAGAIN; return -1; } @@ -23,6 +23,7 @@ #include "array.hpp" #include "pipe.hpp" +#include "msg.hpp" namespace zmq { @@ -40,7 +41,7 @@ namespace zmq void attach (reader_t *pipe_); void terminate (); - int recv (zmq_msg_t *msg_, int flags_); + int recv (msg_t *msg_, int flags_); bool has_in (); // i_reader_events implementation. diff --git a/src/i_inout.hpp b/src/i_inout.hpp index 057b46c..3f8e8e0 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -21,8 +21,7 @@ #ifndef __ZMQ_I_INOUT_HPP_INCLUDED__ #define __ZMQ_I_INOUT_HPP_INCLUDED__ -#include "../include/zmq.h" - +#include "msg.hpp" #include "stdint.hpp" namespace zmq @@ -33,10 +32,10 @@ namespace zmq virtual ~i_inout () {} // Engine asks for a message to send to the network. - virtual bool read (::zmq_msg_t *msg_) = 0; + virtual bool read (msg_t *msg_) = 0; // Engine received message from the network and sends it further on. - virtual bool write (::zmq_msg_t *msg_) = 0; + virtual bool write (msg_t *msg_) = 0; // Flush all the previously written messages. virtual void flush () = 0; diff --git a/src/io_thread.cpp b/src/io_thread.cpp index be52bdd..9678392 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -20,8 +20,6 @@ #include <new> -#include "../include/zmq.h" - #include "io_thread.hpp" #include "platform.hpp" #include "err.hpp" @@ -23,11 +23,9 @@ #include <stdlib.h> #include <string> -#include "../include/zmq.h" - #include "ip.hpp" -#include "platform.hpp" #include "err.hpp" +#include "platform.hpp" #include "stdint.hpp" #if defined ZMQ_HAVE_SOLARIS @@ -18,12 +18,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "../include/zmq.h" - #include "lb.hpp" #include "pipe.hpp" #include "err.hpp" #include "own.hpp" +#include "msg.hpp" zmq::lb_t::lb_t (own_t *sink_) : active (0), @@ -93,26 +92,26 @@ void zmq::lb_t::activated (writer_t *pipe_) active++; } -int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) +int zmq::lb_t::send (msg_t *msg_, int flags_) { // Drop the message if required. If we are at the end of the message // switch back to non-dropping mode. if (dropping) { - more = msg_->flags & ZMQ_MSG_MORE; + more = msg_->flags () & msg_t::more; if (!more) dropping = false; - int rc = zmq_msg_close (msg_); + int rc = msg_->close (); errno_assert (rc == 0); - rc = zmq_msg_init (msg_); |