diff options
| -rw-r--r-- | src/Makefile.am | 6 | ||||
| -rw-r--r-- | src/decoder.cpp (renamed from src/zmq_decoder.cpp) | 30 | ||||
| -rw-r--r-- | src/decoder.hpp | 47 | ||||
| -rw-r--r-- | src/encoder.cpp (renamed from src/zmq_encoder.cpp) | 22 | ||||
| -rw-r--r-- | src/encoder.hpp | 39 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 2 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 6 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 4 | ||||
| -rw-r--r-- | src/zmq_decoder.hpp | 59 | ||||
| -rw-r--r-- | src/zmq_encoder.hpp | 55 | ||||
| -rw-r--r-- | src/zmq_engine.hpp | 8 | 
11 files changed, 106 insertions, 172 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 60c2584..ad05a4b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -120,15 +120,15 @@ libzmq_la_SOURCES = \      ypipe.hpp \      yqueue.hpp \      zmq_connecter.hpp \ -    zmq_decoder.hpp \ -    zmq_encoder.hpp \      zmq_engine.hpp \      zmq_init.hpp \      zmq_listener.hpp \      command.cpp \      ctx.cpp \      connect_session.cpp \ +    decoder.cpp \      devpoll.cpp \ +    encoder.cpp \      epoll.cpp \      err.cpp \      forwarder.cpp \ @@ -172,8 +172,6 @@ libzmq_la_SOURCES = \      xreq.cpp \      zmq.cpp \      zmq_connecter.cpp \ -    zmq_decoder.cpp \ -    zmq_encoder.cpp \      zmq_engine.cpp \      zmq_init.cpp \      zmq_listener.cpp diff --git a/src/zmq_decoder.cpp b/src/decoder.cpp index dcf8e76..131ee24 100644 --- a/src/zmq_decoder.cpp +++ b/src/decoder.cpp @@ -20,38 +20,38 @@  #include <stdlib.h>  #include <string.h> -#include "zmq_decoder.hpp" +#include "decoder.hpp"  #include "i_inout.hpp"  #include "wire.hpp"  #include "err.hpp" -zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : -    decoder_t <zmq_decoder_t> (bufsize_), +zmq::decoder_t::decoder_t (size_t bufsize_) : +    decoder_base_t <decoder_t> (bufsize_),      destination (NULL)  {      zmq_msg_init (&in_progress);      //  At the beginning, read one byte and go to one_byte_size_ready state. -    next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); +    next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);  } -zmq::zmq_decoder_t::~zmq_decoder_t () +zmq::decoder_t::~decoder_t ()  {      zmq_msg_close (&in_progress);  } -void zmq::zmq_decoder_t::set_inout (i_inout *destination_) +void zmq::decoder_t::set_inout (i_inout *destination_)  {      destination = destination_;  } -bool zmq::zmq_decoder_t::one_byte_size_ready () +bool zmq::decoder_t::one_byte_size_ready ()  {      //  First byte of size is read. If it is 0xff read 8-byte size.      //  Otherwise allocate the buffer for message data and read the      //  message data into it.      if (*tmpbuf == 0xff) -        next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready); +        next_step (tmpbuf, 8, &decoder_t::eight_byte_size_ready);      else {          //  TODO:  Handle over-sized message decently. @@ -64,12 +64,12 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()          //  message and thus we can treat it as uninitialised...          int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);          errno_assert (rc == 0); -        next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); +        next_step (tmpbuf, 1, &decoder_t::flags_ready);      }      return true;  } -bool zmq::zmq_decoder_t::eight_byte_size_ready () +bool zmq::decoder_t::eight_byte_size_ready ()  {      //  8-byte size is read. Allocate the buffer for message body and      //  read the message data into it. @@ -86,29 +86,29 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()      //  message and thus we can treat it as uninitialised...      int rc = zmq_msg_init_size (&in_progress, size - 1);      errno_assert (rc == 0); -    next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); +    next_step (tmpbuf, 1, &decoder_t::flags_ready);      return true;  } -bool zmq::zmq_decoder_t::flags_ready () +bool zmq::decoder_t::flags_ready ()  {      //  Store the flags from the wire into the message structure.      in_progress.flags = tmpbuf [0];      next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), -        &zmq_decoder_t::message_ready); +        &decoder_t::message_ready);      return true;  } -bool zmq::zmq_decoder_t::message_ready () +bool zmq::decoder_t::message_ready ()  {      //  Message is completely read. Push it further and start reading      //  new message. (in_progress is a 0-byte message after this point.)      if (!destination || !destination->write (&in_progress))          return false; -    next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); +    next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);      return true;  } diff --git a/src/decoder.hpp b/src/decoder.hpp index f05f651..87982a0 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -27,25 +27,27 @@  #include "err.hpp" +#include "../include/zmq.h" +  namespace zmq  {      //  Helper base class for decoders that know the amount of data to read      //  in advance at any moment. Knowing the amount in advance is a property -    //  of the protocol used. Both AMQP and backend protocol are based on -    //  size-prefixed paradigm, therefore they are using decoder_t to parse -    //  the messages. On the other hand, XML-based transports (like XMPP or -    //  SOAP) don't allow for knowing the size of data to read in advance and -    //  should use different decoding algorithms. +    //  of the protocol used. 0MQ framing protocol is based size-prefixed +    //  paradigm, whixh qualifies it to be parsed by this class. +    //  On the other hand, XML-based transports (like XMPP or SOAP) don't allow +    //  for knowing the size of data to read in advance and should use different +    //  decoding algorithms.      // -    //  Decoder implements the state machine that parses the incoming buffer. +    //  This class implements the state machine that parses the incoming buffer.      //  Derived class should implement individual state machine actions. -    template <typename T> class decoder_t +    template <typename T> class decoder_base_t      {      public: -        inline decoder_t (size_t bufsize_) : +        inline decoder_base_t (size_t bufsize_) :              read_pos (NULL),              to_read (0),              next (NULL), @@ -57,7 +59,7 @@ namespace zmq          //  The destructor doesn't have to be virtual. It is mad virtual          //  just to keep ICC and code checking tools from complaining. -        inline virtual ~decoder_t () +        inline virtual ~decoder_base_t ()          {              free (buf);          } @@ -149,6 +151,32 @@ namespace zmq          size_t bufsize;          unsigned char *buf; +        decoder_base_t (const decoder_base_t&); +        void operator = (const decoder_base_t&); +    }; + +    //  Decoder for 0MQ framing protocol. Converts data batches into messages. + +    class decoder_t : public decoder_base_t <decoder_t> +    { +    public: + +        decoder_t (size_t bufsize_); +        ~decoder_t (); + +        void set_inout (struct i_inout *destination_); + +    private: + +        bool one_byte_size_ready (); +        bool eight_byte_size_ready (); +        bool flags_ready (); +        bool message_ready (); + +        struct i_inout *destination; +        unsigned char tmpbuf [8]; +        ::zmq_msg_t in_progress; +          decoder_t (const decoder_t&);          void operator = (const decoder_t&);      }; @@ -156,3 +184,4 @@ namespace zmq  }  #endif + diff --git a/src/zmq_encoder.cpp b/src/encoder.cpp index d552c61..be9a2c2 100644 --- a/src/zmq_encoder.cpp +++ b/src/encoder.cpp @@ -17,39 +17,39 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#include "zmq_encoder.hpp" +#include "encoder.hpp"  #include "i_inout.hpp"  #include "wire.hpp" -zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : -    encoder_t <zmq_encoder_t> (bufsize_), +zmq::encoder_t::encoder_t (size_t bufsize_) : +    encoder_base_t <encoder_t> (bufsize_),      source (NULL)  {      zmq_msg_init (&in_progress);      //  Write 0 bytes to the batch and go to message_ready state. -    next_step (NULL, 0, &zmq_encoder_t::message_ready, true); +    next_step (NULL, 0, &encoder_t::message_ready, true);  } -zmq::zmq_encoder_t::~zmq_encoder_t () +zmq::encoder_t::~encoder_t ()  {      zmq_msg_close (&in_progress);  } -void zmq::zmq_encoder_t::set_inout (i_inout *source_) +void zmq::encoder_t::set_inout (i_inout *source_)  {      source = source_;  } -bool zmq::zmq_encoder_t::size_ready () +bool zmq::encoder_t::size_ready ()  {      //  Write message body into the buffer.      next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), -        &zmq_encoder_t::message_ready, false); +        &encoder_t::message_ready, false);      return true;  } -bool zmq::zmq_encoder_t::message_ready () +bool zmq::encoder_t::message_ready ()  {      //  Destroy content of the old message.      zmq_msg_close (&in_progress); @@ -75,14 +75,14 @@ bool zmq::zmq_encoder_t::message_ready ()      if (size < 255) {          tmpbuf [0] = (unsigned char) size;          tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED); -        next_step (tmpbuf, 2, &zmq_encoder_t::size_ready, +        next_step (tmpbuf, 2, &encoder_t::size_ready,              !(in_progress.flags & ZMQ_MSG_MORE));      }      else {          tmpbuf [0] = 0xff;          put_uint64 (tmpbuf + 1, size);          tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED); -        next_step (tmpbuf, 10, &zmq_encoder_t::size_ready, +        next_step (tmpbuf, 10, &encoder_t::size_ready,              !(in_progress.flags & ZMQ_MSG_MORE));      }      return true; diff --git a/src/encoder.hpp b/src/encoder.hpp index 0d5b6ba..54cbb34 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -20,11 +20,6 @@  #ifndef __ZMQ_ENCODER_HPP_INCLUDED__  #define __ZMQ_ENCODER_HPP_INCLUDED__ -#include "platform.hpp" -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#endif -  #include <stddef.h>  #include <string.h>  #include <stdlib.h> @@ -32,6 +27,8 @@  #include "err.hpp" +#include "../include/zmq.h" +  namespace zmq  { @@ -39,11 +36,11 @@ namespace zmq      //  fills the outgoing buffer. Derived classes should implement individual      //  state machine actions. -    template <typename T> class encoder_t +    template <typename T> class encoder_base_t      {      public: -        inline encoder_t (size_t bufsize_) : +        inline encoder_base_t (size_t bufsize_) :              bufsize (bufsize_)          {              buf = (unsigned char*) malloc (bufsize_); @@ -52,7 +49,7 @@ namespace zmq          //  The destructor doesn't have to be virtual. It is mad virtual          //  just to keep ICC and code checking tools from complaining. -        inline virtual ~encoder_t () +        inline virtual ~encoder_base_t ()          {              free (buf);          } @@ -153,10 +150,34 @@ namespace zmq          size_t bufsize;          unsigned char *buf; +        encoder_base_t (const encoder_base_t&); +        void operator = (const encoder_base_t&); +    }; + +    //  Encoder for 0MQ framing protocol. Converts messages into data batches. + +    class encoder_t : public encoder_base_t <encoder_t> +    { +    public: + +        encoder_t (size_t bufsize_); +        ~encoder_t (); + +        void set_inout (struct i_inout *source_); + +    private: + +        bool size_ready (); +        bool message_ready (); + +        struct i_inout *source; +        ::zmq_msg_t in_progress; +        unsigned char tmpbuf [10]; +          encoder_t (const encoder_t&);          void operator = (const encoder_t&);      }; -  }  #endif + diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index ff61b96..5532546 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -195,7 +195,7 @@ void zmq::pgm_receiver_t::in_event ()              it->second.joined = true;              //  Create and connect decoder for the peer. -            it->second.decoder = new (std::nothrow) zmq_decoder_t (0); +            it->second.decoder = new (std::nothrow) decoder_t (0);              it->second.decoder->set_inout (inout);          } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 7215324..bbdb31d 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -34,7 +34,7 @@  #include "io_object.hpp"  #include "i_engine.hpp"  #include "options.hpp" -#include "zmq_decoder.hpp" +#include "decoder.hpp"  #include "pgm_socket.hpp"  namespace zmq @@ -68,7 +68,7 @@ namespace zmq          struct peer_info_t          {              bool joined; -            zmq_decoder_t *decoder; +            decoder_t *decoder;          };          struct tsi_comp @@ -98,7 +98,7 @@ namespace zmq          i_inout *inout;          //  Most recently used decoder. -        zmq_decoder_t *mru_decoder; +        decoder_t *mru_decoder;          //  Number of bytes not consumed by the decoder due to pipe overflow.          size_t pending_bytes; diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index a1ac329..bee416c 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -33,7 +33,7 @@  #include "i_engine.hpp"  #include "options.hpp"  #include "pgm_socket.hpp" -#include "zmq_encoder.hpp" +#include "encoder.hpp"  namespace zmq  { @@ -62,7 +62,7 @@ namespace zmq      private:          //  Message encoder. -        zmq_encoder_t encoder; +        encoder_t encoder;          //  PGM socket.          pgm_socket_t pgm_socket; diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp deleted file mode 100644 index c1e3e3e..0000000 --- a/src/zmq_decoder.hpp +++ /dev/null @@ -1,59 +0,0 @@ -/* -    Copyright (c) 2007-2010 iMatix Corporation - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ZMQ_DECODER_HPP_INCLUDED__ -#define __ZMQ_ZMQ_DECODER_HPP_INCLUDED__ - -#include "../include/zmq.h" - -#include "decoder.hpp" -#include "blob.hpp" - -namespace zmq -{ -    //  Decoder for 0MQ backend protocol. Converts data batches into messages. - -    class zmq_decoder_t : public decoder_t <zmq_decoder_t> -    { -    public: - -        zmq_decoder_t (size_t bufsize_); -        ~zmq_decoder_t (); - -        void set_inout (struct i_inout *destination_); - -    private: - -        bool one_byte_size_ready (); -        bool eight_byte_size_ready (); -        bool flags_ready (); -        bool message_ready (); - -        struct i_inout *destination; -        unsigned char tmpbuf [8]; -        ::zmq_msg_t in_progress; - -        zmq_decoder_t (const zmq_decoder_t&); -        void operator = (const zmq_decoder_t&); -    }; - -} - -#endif - diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp deleted file mode 100644 index 61899f4..0000000 --- a/src/zmq_encoder.hpp +++ /dev/null @@ -1,55 +0,0 @@ -/* -    Copyright (c) 2007-2010 iMatix Corporation - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__ -#define __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__ - -#include "../include/zmq.h" - -#include "encoder.hpp" - -namespace zmq -{ -    //  Encoder for 0MQ backend protocol. Converts messages into data batches. - -    class zmq_encoder_t : public encoder_t <zmq_encoder_t> -    { -    public: - -        zmq_encoder_t (size_t bufsize_); -        ~zmq_encoder_t (); - -        void set_inout (struct i_inout *source_); - -    private: - -        bool size_ready (); -        bool message_ready (); - -        struct i_inout *source; -        ::zmq_msg_t in_progress; -        unsigned char tmpbuf [10]; - -        zmq_encoder_t (const zmq_encoder_t&); -        void operator = (const zmq_encoder_t&); -    }; -} - -#endif - diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 1023051..bdd2a5d 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -27,8 +27,8 @@  #include "i_engine.hpp"  #include "io_object.hpp"  #include "tcp_socket.hpp" -#include "zmq_encoder.hpp" -#include "zmq_decoder.hpp" +#include "encoder.hpp" +#include "decoder.hpp"  #include "options.hpp"  namespace zmq @@ -62,11 +62,11 @@ namespace zmq          unsigned char *inpos;          size_t insize; -        zmq_decoder_t decoder; +        decoder_t decoder;          unsigned char *outpos;          size_t outsize; -        zmq_encoder_t encoder; +        encoder_t encoder;          i_inout *inout;  | 
