diff options
-rw-r--r-- | src/Makefile.am | 6 | ||||
-rw-r--r-- | src/decoder.cpp (renamed from src/zmq_decoder.cpp) | 30 | ||||
-rw-r--r-- | src/decoder.hpp | 47 | ||||
-rw-r--r-- | src/encoder.cpp (renamed from src/zmq_encoder.cpp) | 22 | ||||
-rw-r--r-- | src/encoder.hpp | 39 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 2 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 6 | ||||
-rw-r--r-- | src/pgm_sender.hpp | 4 | ||||
-rw-r--r-- | src/zmq_decoder.hpp | 59 | ||||
-rw-r--r-- | src/zmq_encoder.hpp | 55 | ||||
-rw-r--r-- | src/zmq_engine.hpp | 8 |
11 files changed, 106 insertions, 172 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 60c2584..ad05a4b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -120,15 +120,15 @@ libzmq_la_SOURCES = \ ypipe.hpp \ yqueue.hpp \ zmq_connecter.hpp \ - zmq_decoder.hpp \ - zmq_encoder.hpp \ zmq_engine.hpp \ zmq_init.hpp \ zmq_listener.hpp \ command.cpp \ ctx.cpp \ connect_session.cpp \ + decoder.cpp \ devpoll.cpp \ + encoder.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -172,8 +172,6 @@ libzmq_la_SOURCES = \ xreq.cpp \ zmq.cpp \ zmq_connecter.cpp \ - zmq_decoder.cpp \ - zmq_encoder.cpp \ zmq_engine.cpp \ zmq_init.cpp \ zmq_listener.cpp diff --git a/src/zmq_decoder.cpp b/src/decoder.cpp index dcf8e76..131ee24 100644 --- a/src/zmq_decoder.cpp +++ b/src/decoder.cpp @@ -20,38 +20,38 @@ #include <stdlib.h> #include <string.h> -#include "zmq_decoder.hpp" +#include "decoder.hpp" #include "i_inout.hpp" #include "wire.hpp" #include "err.hpp" -zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : - decoder_t <zmq_decoder_t> (bufsize_), +zmq::decoder_t::decoder_t (size_t bufsize_) : + decoder_base_t <decoder_t> (bufsize_), destination (NULL) { zmq_msg_init (&in_progress); // At the beginning, read one byte and go to one_byte_size_ready state. - next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); + next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); } -zmq::zmq_decoder_t::~zmq_decoder_t () +zmq::decoder_t::~decoder_t () { zmq_msg_close (&in_progress); } -void zmq::zmq_decoder_t::set_inout (i_inout *destination_) +void zmq::decoder_t::set_inout (i_inout *destination_) { destination = destination_; } -bool zmq::zmq_decoder_t::one_byte_size_ready () +bool zmq::decoder_t::one_byte_size_ready () { // First byte of size is read. If it is 0xff read 8-byte size. // Otherwise allocate the buffer for message data and read the // message data into it. if (*tmpbuf == 0xff) - next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready); + next_step (tmpbuf, 8, &decoder_t::eight_byte_size_ready); else { // TODO: Handle over-sized message decently. @@ -64,12 +64,12 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () // message and thus we can treat it as uninitialised... int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); errno_assert (rc == 0); - next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); + next_step (tmpbuf, 1, &decoder_t::flags_ready); } return true; } -bool zmq::zmq_decoder_t::eight_byte_size_ready () +bool zmq::decoder_t::eight_byte_size_ready () { // 8-byte size is read. Allocate the buffer for message body and // read the message data into it. @@ -86,29 +86,29 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // message and thus we can treat it as uninitialised... int rc = zmq_msg_init_size (&in_progress, size - 1); errno_assert (rc == 0); - next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); + next_step (tmpbuf, 1, &decoder_t::flags_ready); return true; } -bool zmq::zmq_decoder_t::flags_ready () +bool zmq::decoder_t::flags_ready () { // Store the flags from the wire into the message structure. in_progress.flags = tmpbuf [0]; next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), - &zmq_decoder_t::message_ready); + &decoder_t::message_ready); return true; } -bool zmq::zmq_decoder_t::message_ready () +bool zmq::decoder_t::message_ready () { // Message is completely read. Push it further and start reading // new message. (in_progress is a 0-byte message after this point.) if (!destination || !destination->write (&in_progress)) return false; - next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); + next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); return true; } diff --git a/src/decoder.hpp b/src/decoder.hpp index f05f651..87982a0 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -27,25 +27,27 @@ #include "err.hpp" +#include "../include/zmq.h" + namespace zmq { // Helper base class for decoders that know the amount of data to read // in advance at any moment. Knowing the amount in advance is a property - // of the protocol used. Both AMQP and backend protocol are based on - // size-prefixed paradigm, therefore they are using decoder_t to parse - // the messages. On the other hand, XML-based transports (like XMPP or - // SOAP) don't allow for knowing the size of data to read in advance and - // should use different decoding algorithms. + // of the protocol used. 0MQ framing protocol is based size-prefixed + // paradigm, whixh qualifies it to be parsed by this class. + // On the other hand, XML-based transports (like XMPP or SOAP) don't allow + // for knowing the size of data to read in advance and should use different + // decoding algorithms. // - // Decoder implements the state machine that parses the incoming buffer. + // This class implements the state machine that parses the incoming buffer. // Derived class should implement individual state machine actions. - template <typename T> class decoder_t + template <typename T> class decoder_base_t { public: - inline decoder_t (size_t bufsize_) : + inline decoder_base_t (size_t bufsize_) : read_pos (NULL), to_read (0), next (NULL), @@ -57,7 +59,7 @@ namespace zmq // The destructor doesn't have to be virtual. It is mad virtual // just to keep ICC and code checking tools from complaining. - inline virtual ~decoder_t () + inline virtual ~decoder_base_t () { free (buf); } @@ -149,6 +151,32 @@ namespace zmq size_t bufsize; unsigned char *buf; + decoder_base_t (const decoder_base_t&); + void operator = (const decoder_base_t&); + }; + + // Decoder for 0MQ framing protocol. Converts data batches into messages. + + class decoder_t : public decoder_base_t <decoder_t> + { + public: + + decoder_t (size_t bufsize_); + ~decoder_t (); + + void set_inout (struct i_inout *destination_); + + private: + + bool one_byte_size_ready (); + bool eight_byte_size_ready (); + bool flags_ready (); + bool message_ready (); + + struct i_inout *destination; + unsigned char tmpbuf [8]; + ::zmq_msg_t in_progress; + decoder_t (const decoder_t&); void operator = (const decoder_t&); }; @@ -156,3 +184,4 @@ namespace zmq } #endif + diff --git a/src/zmq_encoder.cpp b/src/encoder.cpp index d552c61..be9a2c2 100644 --- a/src/zmq_encoder.cpp +++ b/src/encoder.cpp @@ -17,39 +17,39 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "zmq_encoder.hpp" +#include "encoder.hpp" #include "i_inout.hpp" #include "wire.hpp" -zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : - encoder_t <zmq_encoder_t> (bufsize_), +zmq::encoder_t::encoder_t (size_t bufsize_) : + encoder_base_t <encoder_t> (bufsize_), source (NULL) { zmq_msg_init (&in_progress); // Write 0 bytes to the batch and go to message_ready state. - next_step (NULL, 0, &zmq_encoder_t::message_ready, true); + next_step (NULL, 0, &encoder_t::message_ready, true); } -zmq::zmq_encoder_t::~zmq_encoder_t () +zmq::encoder_t::~encoder_t () { zmq_msg_close (&in_progress); } -void zmq::zmq_encoder_t::set_inout (i_inout *source_) +void zmq::encoder_t::set_inout (i_inout *source_) { source = source_; } -bool zmq::zmq_encoder_t::size_ready () +bool zmq::encoder_t::size_ready () { // Write message body into the buffer. next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), - &zmq_encoder_t::message_ready, false); + &encoder_t::message_ready, false); return true; } -bool zmq::zmq_encoder_t::message_ready () +bool zmq::encoder_t::message_ready () { // Destroy content of the old message. zmq_msg_close (&in_progress); @@ -75,14 +75,14 @@ bool zmq::zmq_encoder_t::message_ready () if (size < 255) { tmpbuf [0] = (unsigned char) size; tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED); - next_step (tmpbuf, 2, &zmq_encoder_t::size_ready, + next_step (tmpbuf, 2, &encoder_t::size_ready, !(in_progress.flags & ZMQ_MSG_MORE)); } else { tmpbuf [0] = 0xff; put_uint64 (tmpbuf + 1, size); tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED); - next_step (tmpbuf, 10, &zmq_encoder_t::size_ready, + next_step (tmpbuf, 10, &encoder_t::size_ready, !(in_progress.flags & ZMQ_MSG_MORE)); } return true; diff --git a/src/encoder.hpp b/src/encoder.hpp index 0d5b6ba..54cbb34 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -20,11 +20,6 @@ #ifndef __ZMQ_ENCODER_HPP_INCLUDED__ #define __ZMQ_ENCODER_HPP_INCLUDED__ -#include "platform.hpp" -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#endif - #include <stddef.h> #include <string.h> #include <stdlib.h> @@ -32,6 +27,8 @@ #include "err.hpp" +#include "../include/zmq.h" + namespace zmq { @@ -39,11 +36,11 @@ namespace zmq // fills the outgoing buffer. Derived classes should implement individual // state machine actions. - template <typename T> class encoder_t + template <typename T> class encoder_base_t { public: - inline encoder_t (size_t bufsize_) : + inline encoder_base_t (size_t bufsize_) : bufsize (bufsize_) { buf = (unsigned char*) malloc (bufsize_); @@ -52,7 +49,7 @@ namespace zmq // The destructor doesn't have to be virtual. It is mad virtual // just to keep ICC and code checking tools from complaining. - inline virtual ~encoder_t () + inline virtual ~encoder_base_t () { free (buf); } @@ -153,10 +150,34 @@ namespace zmq size_t bufsize; unsigned char *buf; + encoder_base_t (const encoder_base_t&); + void operator = (const encoder_base_t&); + }; + + // Encoder for 0MQ framing protocol. Converts messages into data batches. + + class encoder_t : public encoder_base_t <encoder_t> + { + public: + + encoder_t (size_t bufsize_); + ~encoder_t (); + + void set_inout (struct i_inout *source_); + + private: + + bool size_ready (); + bool message_ready (); + + struct i_inout *source; + ::zmq_msg_t in_progress; + unsigned char tmpbuf [10]; + encoder_t (const encoder_t&); void operator = (const encoder_t&); }; - } #endif + diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index ff61b96..5532546 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -195,7 +195,7 @@ void zmq::pgm_receiver_t::in_event () it->second.joined = true; // Create and connect decoder for the peer. - it->second.decoder = new (std::nothrow) zmq_decoder_t (0); + it->second.decoder = new (std::nothrow) decoder_t (0); it->second.decoder->set_inout (inout); } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 7215324..bbdb31d 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -34,7 +34,7 @@ #include "io_object.hpp" #include "i_engine.hpp" #include "options.hpp" -#include "zmq_decoder.hpp" +#include "decoder.hpp" #include "pgm_socket.hpp" namespace zmq @@ -68,7 +68,7 @@ namespace zmq struct peer_info_t { bool joined; - zmq_decoder_t *decoder; + decoder_t *decoder; }; struct tsi_comp @@ -98,7 +98,7 @@ namespace zmq i_inout *inout; // Most recently used decoder. - zmq_decoder_t *mru_decoder; + decoder_t *mru_decoder; // Number of bytes not consumed by the decoder due to pipe overflow. size_t pending_bytes; diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index a1ac329..bee416c 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -33,7 +33,7 @@ #include "i_engine.hpp" #include "options.hpp" #include "pgm_socket.hpp" -#include "zmq_encoder.hpp" +#include "encoder.hpp" namespace zmq { @@ -62,7 +62,7 @@ namespace zmq private: // Message encoder. - zmq_encoder_t encoder; + encoder_t encoder; // PGM socket. pgm_socket_t pgm_socket; diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp deleted file mode 100644 index c1e3e3e..0000000 --- a/src/zmq_decoder.hpp +++ /dev/null @@ -1,59 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ZMQ_DECODER_HPP_INCLUDED__ -#define __ZMQ_ZMQ_DECODER_HPP_INCLUDED__ - -#include "../include/zmq.h" - -#include "decoder.hpp" -#include "blob.hpp" - -namespace zmq -{ - // Decoder for 0MQ backend protocol. Converts data batches into messages. - - class zmq_decoder_t : public decoder_t <zmq_decoder_t> - { - public: - - zmq_decoder_t (size_t bufsize_); - ~zmq_decoder_t (); - - void set_inout (struct i_inout *destination_); - - private: - - bool one_byte_size_ready (); - bool eight_byte_size_ready (); - bool flags_ready (); - bool message_ready (); - - struct i_inout *destination; - unsigned char tmpbuf [8]; - ::zmq_msg_t in_progress; - - zmq_decoder_t (const zmq_decoder_t&); - void operator = (const zmq_decoder_t&); - }; - -} - -#endif - diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp deleted file mode 100644 index 61899f4..0000000 --- a/src/zmq_encoder.hpp +++ /dev/null @@ -1,55 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__ -#define __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__ - -#include "../include/zmq.h" - -#include "encoder.hpp" - -namespace zmq -{ - // Encoder for 0MQ backend protocol. Converts messages into data batches. - - class zmq_encoder_t : public encoder_t <zmq_encoder_t> - { - public: - - zmq_encoder_t (size_t bufsize_); - ~zmq_encoder_t (); - - void set_inout (struct i_inout *source_); - - private: - - bool size_ready (); - bool message_ready (); - - struct i_inout *source; - ::zmq_msg_t in_progress; - unsigned char tmpbuf [10]; - - zmq_encoder_t (const zmq_encoder_t&); - void operator = (const zmq_encoder_t&); - }; -} - -#endif - diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 1023051..bdd2a5d 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -27,8 +27,8 @@ #include "i_engine.hpp" #include "io_object.hpp" #include "tcp_socket.hpp" -#include "zmq_encoder.hpp" -#include "zmq_decoder.hpp" +#include "encoder.hpp" +#include "decoder.hpp" #include "options.hpp" namespace zmq @@ -62,11 +62,11 @@ namespace zmq unsigned char *inpos; size_t insize; - zmq_decoder_t decoder; + decoder_t decoder; unsigned char *outpos; size_t outsize; - zmq_encoder_t encoder; + encoder_t encoder; i_inout *inout; |