diff options
Diffstat (limited to 'src')
54 files changed, 605 insertions, 446 deletions
| diff --git a/src/config.hpp b/src/config.hpp index 3df66c7..dff3f87 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -36,6 +36,10 @@ namespace zmq          //  memory allocation by approximately 99.6%          message_pipe_granularity = 256, +        //  Size in bytes of the largest message that is still copied around +        //  rather than being reference-counted. +        max_vsm_size = 29, +          //  Determines how often does socket poll for new commands when it          //  still has unprocessed messages to handle. Thus, if it is set to 100,          //  socket will process 100 inbound messages before doing the poll. diff --git a/src/ctx.cpp b/src/ctx.cpp index 2758729..fb5420d 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -26,8 +26,9 @@  #include "io_thread.hpp"  #include "platform.hpp"  #include "reaper.hpp" -#include "err.hpp"  #include "pipe.hpp" +#include "err.hpp" +#include "msg.hpp"  #if defined ZMQ_HAVE_WINDOWS  #include "windows.h" @@ -304,10 +305,10 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)  void zmq::ctx_t::log (const char *format_, va_list args_)  {      //  Create the log message. -    zmq_msg_t msg; -    int rc = zmq_msg_init_size (&msg, strlen (format_) + 1); -    zmq_assert (rc == 0); -    memcpy (zmq_msg_data (&msg), format_, zmq_msg_size (&msg)); +    msg_t msg; +    int rc = msg.init_size (strlen (format_) + 1); +    errno_assert (rc == 0); +    memcpy (msg.data (), format_, msg.size ());      //  At this  point we migrate the log socket to the current thread.      //  We rely on mutex for executing the memory barrier. @@ -316,7 +317,8 @@ void zmq::ctx_t::log (const char *format_, va_list args_)          log_socket->send (&msg, 0);      log_sync.unlock (); -    zmq_msg_close (&msg); +    rc = msg.close (); +    errno_assert (rc == 0);  } diff --git a/src/ctx.hpp b/src/ctx.hpp index 33d5dad..7d865fa 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -26,8 +26,6 @@  #include <string>  #include <stdarg.h> -#include "../include/zmq.h" -  #include "mailbox.hpp"  #include "semaphore.hpp"  #include "ypipe.hpp" diff --git a/src/decoder.cpp b/src/decoder.cpp index efb39e8..bcf5974 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -31,7 +31,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :      destination (NULL),      maxmsgsize (maxmsgsize_)  { -    zmq_msg_init (&in_progress); +    int rc = in_progress.init (); +    errno_assert (rc == 0);      //  At the beginning, read one byte and go to one_byte_size_ready state.      next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); @@ -39,7 +40,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :  zmq::decoder_t::~decoder_t ()  { -    zmq_msg_close (&in_progress); +    int rc = in_progress.close (); +    errno_assert (rc == 0);  }  void zmq::decoder_t::set_inout (i_inout *destination_) @@ -71,9 +73,9 @@ bool zmq::decoder_t::one_byte_size_ready ()              errno = ENOMEM;          }          else -            rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); +            rc = in_progress.init_size (*tmpbuf - 1);          if (rc != 0 && errno == ENOMEM) { -            rc = zmq_msg_init (&in_progress); +            rc = in_progress.init ();              errno_assert (rc == 0);              decoding_error ();              return false; @@ -106,9 +108,9 @@ bool zmq::decoder_t::eight_byte_size_ready ()          errno = ENOMEM;      }      else -        rc = zmq_msg_init_size (&in_progress, size - 1); +        rc = in_progress.init_size (size - 1);      if (rc != 0 && errno == ENOMEM) { -        rc = zmq_msg_init (&in_progress); +        rc = in_progress.init ();          errno_assert (rc == 0);          decoding_error ();          return false; @@ -122,9 +124,9 @@ bool zmq::decoder_t::eight_byte_size_ready ()  bool zmq::decoder_t::flags_ready ()  {      //  Store the flags from the wire into the message structure. -    in_progress.flags = tmpbuf [0]; +    in_progress.set_flags (tmpbuf [0]); -    next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), +    next_step (in_progress.data (), in_progress.size (),          &decoder_t::message_ready);      return true; diff --git a/src/decoder.hpp b/src/decoder.hpp index 23806a3..114ecef 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -27,10 +27,9 @@  #include <algorithm>  #include "err.hpp" +#include "msg.hpp"  #include "stdint.hpp" -#include "../include/zmq.h" -  namespace zmq  { @@ -196,7 +195,7 @@ namespace zmq          struct i_inout *destination;          unsigned char tmpbuf [8]; -        ::zmq_msg_t in_progress; +        msg_t in_progress;          int64_t maxmsgsize; diff --git a/src/dist.cpp b/src/dist.cpp index 9d50368..093da79 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -18,8 +18,6 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#include "../include/zmq.h" -  #include "dist.hpp"  #include "pipe.hpp"  #include "err.hpp" @@ -89,10 +87,10 @@ void zmq::dist_t::activated (writer_t *pipe_)      active++;  } -int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) +int zmq::dist_t::send (msg_t *msg_, int flags_)  {      //  Is this end of a multipart message? -    bool msg_more = msg_->flags & ZMQ_MSG_MORE; +    bool msg_more = msg_->flags () & msg_t::more;      //  Push the message to active pipes.      distribute (msg_, flags_); @@ -106,63 +104,33 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)      return 0;  } -void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_) +void zmq::dist_t::distribute (msg_t *msg_, int flags_)  {      //  If there are no active pipes available, simply drop the message.      if (active == 0) { -        int rc = zmq_msg_close (msg_); -        zmq_assert (rc == 0); -        rc = zmq_msg_init (msg_); -        zmq_assert (rc == 0); -        return; -    } - -    msg_content_t *content = (msg_content_t*) msg_->content; - -    //  For VSMs the copying is straighforward. -    if (content == (msg_content_t*) ZMQ_VSM) { -        for (pipes_t::size_type i = 0; i < active;) -            if (write (pipes [i], msg_)) -                i++; -        int rc = zmq_msg_init (msg_); -        zmq_assert (rc == 0); -        return; -    } - -    //  Optimisation for the case when there's only a single pipe -    //  to send the message to - no refcount adjustment i.e. no atomic -    //  operations are needed. -    if (active == 1) { -        if (!write (pipes [0], msg_)) { -            int rc = zmq_msg_close (msg_); -            zmq_assert (rc == 0); -        } -        int rc = zmq_msg_init (msg_); +        int rc = msg_->close (); +        errno_assert (rc == 0); +        rc = msg_->init ();          zmq_assert (rc == 0);          return;      } -    //  There are at least 2 destinations for the message. That means we have -    //  to deal with reference counting. First add N-1 references to -    //  the content (we are holding one reference anyway, that's why -1). -    if (msg_->flags & ZMQ_MSG_SHARED) -        content->refcnt.add (active - 1); -    else { -        content->refcnt.set (active); -        msg_->flags |= ZMQ_MSG_SHARED; -    } +    //  Add active-1 references to the message. We already hold one reference, +    //  that's why -1. +    msg_->add_refs (active - 1); -    //  Push the message to all destinations. +    //  Push copy of the message to each active pipe.      for (pipes_t::size_type i = 0; i < active;) {          if (!write (pipes [i], msg_)) -            content->refcnt.sub (1); +            msg_->rm_refs (1);          else              i++;      } -    //  Detach the original message from the data buffer. -    int rc = zmq_msg_init (msg_); -    zmq_assert (rc == 0); +    //  Detach the original message from the data buffer. Note that we don't +    //  close the message. That's because we've already used all the references. +    int rc = msg_->init (); +    errno_assert (rc == 0);  }  bool zmq::dist_t::has_out () @@ -170,14 +138,14 @@ bool zmq::dist_t::has_out ()      return true;  } -bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) +bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_)  {      if (!pipe_->write (msg_)) {          active--;          pipes.swap (pipes.index (pipe_), active);          return false;      } -    if (!(msg_->flags & ZMQ_MSG_MORE)) +    if (!(msg_->flags () & msg_t::more))          pipe_->flush ();      return true;  } diff --git a/src/dist.hpp b/src/dist.hpp index ad9767a..ea05305 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -40,7 +40,7 @@ namespace zmq          void attach (writer_t *pipe_);          void terminate (); -        int send (zmq_msg_t *msg_, int flags_); +        int send (class msg_t *msg_, int flags_);          bool has_out ();          //  i_writer_events interface implementation. @@ -51,10 +51,10 @@ namespace zmq          //  Write the message to the pipe. Make the pipe inactive if writing          //  fails. In such a case false is returned. -        bool write (class writer_t *pipe_, zmq_msg_t *msg_); +        bool write (class writer_t *pipe_, class msg_t *msg_);          //  Put the message to all active pipes. -        void distribute (zmq_msg_t *msg_, int flags_); +        void distribute (class msg_t *msg_, int flags_);          //  Plug in all the delayed pipes.          void clear_new_pipes (); diff --git a/src/encoder.cpp b/src/encoder.cpp index 88e1dff..a42f06f 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -26,7 +26,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) :      encoder_base_t <encoder_t> (bufsize_),      source (NULL)  { -    zmq_msg_init (&in_progress); +    int rc = in_progress.init (); +    errno_assert (rc == 0);      //  Write 0 bytes to the batch and go to message_ready state.      next_step (NULL, 0, &encoder_t::message_ready, true); @@ -34,7 +35,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) :  zmq::encoder_t::~encoder_t ()  { -    zmq_msg_close (&in_progress); +    int rc = in_progress.close (); +    errno_assert (rc == 0);  }  void zmq::encoder_t::set_inout (i_inout *source_) @@ -45,7 +47,7 @@ void zmq::encoder_t::set_inout (i_inout *source_)  bool zmq::encoder_t::size_ready ()  {      //  Write message body into the buffer. -    next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), +    next_step (in_progress.data (), in_progress.size (),          &encoder_t::message_ready, false);      return true;  } @@ -53,19 +55,21 @@ bool zmq::encoder_t::size_ready ()  bool zmq::encoder_t::message_ready ()  {      //  Destroy content of the old message. -    zmq_msg_close (&in_progress); +    int rc = in_progress.close (); +    errno_assert (rc == 0);      //  Read new message. If there is none, return false.      //  Note that new state is set only if write is successful. That way      //  unsuccessful write will cause retry on the next state machine      //  invocation.      if (!source || !source->read (&in_progress)) { -        zmq_msg_init (&in_progress); +        rc = in_progress.init (); +        errno_assert (rc == 0);          return false;      }      //  Get the message size. -    size_t size = zmq_msg_size (&in_progress); +    size_t size = in_progress.size ();      //  Account for the 'flags' byte.      size++; @@ -75,16 +79,16 @@ bool zmq::encoder_t::message_ready ()      //  message size. In both cases 'flags' field follows.      if (size < 255) {          tmpbuf [0] = (unsigned char) size; -        tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED); +        tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);          next_step (tmpbuf, 2, &encoder_t::size_ready, -            !(in_progress.flags & ZMQ_MSG_MORE)); +            !(in_progress.flags () & msg_t::more));      }      else {          tmpbuf [0] = 0xff;          put_uint64 (tmpbuf + 1, size); -        tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED); +        tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);          next_step (tmpbuf, 10, &encoder_t::size_ready, -            !(in_progress.flags & ZMQ_MSG_MORE)); +            !(in_progress.flags () & msg_t::more));      }      return true;  } diff --git a/src/encoder.hpp b/src/encoder.hpp index 918ec2b..617b65b 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -27,8 +27,7 @@  #include <algorithm>  #include "err.hpp" - -#include "../include/zmq.h" +#include "msg.hpp"  namespace zmq  { @@ -172,7 +171,7 @@ namespace zmq          bool message_ready ();          struct i_inout *source; -        ::zmq_msg_t in_progress; +        msg_t in_progress;          unsigned char tmpbuf [10];          encoder_t (const encoder_t&); diff --git a/src/err.cpp b/src/err.cpp index 8761c22..87a0006 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -18,8 +18,6 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#include "../include/zmq.h" -  #include "err.hpp"  #include "platform.hpp" diff --git a/src/err.hpp b/src/err.hpp index 3ffd99d..6289a08 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -21,6 +21,9 @@  #ifndef __ZMQ_ERR_HPP_INCLUDED__  #define __ZMQ_ERR_HPP_INCLUDED__ +//  0MQ-specific error codes are defined in zmq.h +#include "../include/zmq.h" +  #include <assert.h>  #include <errno.h>  #include <string.h> @@ -18,12 +18,11 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#include "../include/zmq.h" -  #include "fq.hpp"  #include "pipe.hpp"  #include "err.hpp"  #include "own.hpp" +#include "msg.hpp"  zmq::fq_t::fq_t (own_t *sink_) :      active (0), @@ -95,10 +94,11 @@ void zmq::fq_t::activated (reader_t *pipe_)      active++;  } -int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) +int zmq::fq_t::recv (msg_t *msg_, int flags_)  {      //  Deallocate old content of the message. -    zmq_msg_close (msg_); +    int rc = msg_->close (); +    errno_assert (rc == 0);      //  Round-robin over the pipes to get the next message.      for (int count = active; count != 0; count--) { @@ -116,7 +116,7 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)          //  and replaced by another active pipe. Thus we don't have to increase          //  the 'current' pointer.          if (fetched) { -            more = msg_->flags & ZMQ_MSG_MORE; +            more = msg_->flags () & msg_t::more;              if (!more) {                  current++;                  if (current >= active) @@ -134,7 +134,8 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)      //  No message is available. Initialise the output parameter      //  to be a 0-byte message. -    zmq_msg_init (msg_); +    rc = msg_->init (); +    errno_assert (rc == 0);      errno = EAGAIN;      return -1;  } @@ -23,6 +23,7 @@  #include "array.hpp"  #include "pipe.hpp" +#include "msg.hpp"  namespace zmq  { @@ -40,7 +41,7 @@ namespace zmq          void attach (reader_t *pipe_);          void terminate (); -        int recv (zmq_msg_t *msg_, int flags_); +        int recv (msg_t *msg_, int flags_);          bool has_in ();          //  i_reader_events implementation. diff --git a/src/i_inout.hpp b/src/i_inout.hpp index 057b46c..3f8e8e0 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -21,8 +21,7 @@  #ifndef __ZMQ_I_INOUT_HPP_INCLUDED__  #define __ZMQ_I_INOUT_HPP_INCLUDED__ -#include "../include/zmq.h" - +#include "msg.hpp"  #include "stdint.hpp"  namespace zmq @@ -33,10 +32,10 @@ namespace zmq          virtual ~i_inout () {}          //  Engine asks for a message to send to the network. -        virtual bool read (::zmq_msg_t *msg_) = 0; +        virtual bool read (msg_t *msg_) = 0;          //  Engine received message from the network and sends it further on. -        virtual bool write (::zmq_msg_t *msg_) = 0; +        virtual bool write (msg_t *msg_) = 0;          //  Flush all the previously written messages.          virtual void flush () = 0; diff --git a/src/io_thread.cpp b/src/io_thread.cpp index be52bdd..9678392 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -20,8 +20,6 @@  #include <new> -#include "../include/zmq.h" -  #include "io_thread.hpp"  #include "platform.hpp"  #include "err.hpp" @@ -23,11 +23,9 @@  #include <stdlib.h>  #include <string> -#include "../include/zmq.h" -  #include "ip.hpp" -#include "platform.hpp"  #include "err.hpp" +#include "platform.hpp"  #include "stdint.hpp"  #if defined ZMQ_HAVE_SOLARIS @@ -18,12 +18,11 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#include "../include/zmq.h" -  #include "lb.hpp"  #include "pipe.hpp"  #include "err.hpp"  #include "own.hpp" +#include "msg.hpp"  zmq::lb_t::lb_t (own_t *sink_) :      active (0), @@ -93,26 +92,26 @@ void zmq::lb_t::activated (writer_t *pipe_)      active++;  } -int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) +int zmq::lb_t::send (msg_t *msg_, int flags_)  {      //  Drop the message if required. If we are at the end of the message      //  switch back to non-dropping mode.      if (dropping) { -        more = msg_->flags & ZMQ_MSG_MORE; +        more = msg_->flags () & msg_t::more;          if (!more)              dropping = false; -        int rc = zmq_msg_close (msg_); +        int rc = msg_->close ();          errno_assert (rc == 0); -        rc = zmq_msg_init (msg_); +        rc = msg_->init ();          zmq_assert (rc == 0);          return 0;      }      while (active > 0) {          if (pipes [current]->write (msg_)) { -            more = msg_->flags & ZMQ_MSG_MORE; +            more = msg_->flags () & msg_t::more;              break;          } @@ -138,8 +137,8 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)      }      //  Detach the message from the data buffer. -    int rc = zmq_msg_init (msg_); -    zmq_assert (rc == 0); +    int rc = msg_->init (); +    errno_assert (rc == 0);      return 0;  } @@ -154,13 +153,16 @@ bool zmq::lb_t::has_out ()      while (active > 0) {          //  Check whether zero-sized message can be written to the pipe. -        zmq_msg_t msg; -        zmq_msg_init (&msg); +        msg_t msg; +        int rc = msg.init (); +        errno_assert (rc == 0);          if (pipes [current]->check_write (&msg)) { -            zmq_msg_close (&msg); +            rc = msg.close (); +            errno_assert (rc == 0);              return true;          } -        zmq_msg_close (&msg); +        rc = msg.close (); +        errno_assert (rc == 0);          //  Deactivate the pipe.          active--; @@ -38,7 +38,7 @@ namespace zmq          void attach (writer_t *pipe_);          void terminate (); -        int send (zmq_msg_t *msg_, int flags_); +        int send (msg_t *msg_, int flags_);          bool has_out ();          //  i_writer_events interface implementation. diff --git a/src/msg.cpp b/src/msg.cpp index e800bd6..bd6f066 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -29,155 +29,234 @@  #include "likely.hpp"  #include "err.hpp" -int zmq_msg_init (zmq_msg_t *msg_) +bool zmq::msg_t::check ()  { -    msg_->content = (zmq::msg_content_t*) ZMQ_VSM; -    msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; -    msg_->vsm_size = 0; +     return u.base.type >= type_min && u. | 
