diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 1 | ||||
| -rw-r--r-- | src/decoder.cpp | 10 | ||||
| -rw-r--r-- | src/decoder.hpp | 4 | ||||
| -rw-r--r-- | src/encoder.cpp | 10 | ||||
| -rw-r--r-- | src/encoder.hpp | 4 | ||||
| -rw-r--r-- | src/i_engine.hpp | 23 | ||||
| -rw-r--r-- | src/i_inout.hpp | 49 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 13 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 6 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 6 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 2 | ||||
| -rw-r--r-- | src/session.hpp | 6 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 43 | ||||
| -rw-r--r-- | src/zmq_engine.hpp | 8 | ||||
| -rw-r--r-- | src/zmq_init.hpp | 7 | 
15 files changed, 81 insertions, 111 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index ae20d33..45e8ac0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -24,7 +24,6 @@ libzmq_la_SOURCES = \      err.hpp \      fd.hpp \      fq.hpp \ -    i_inout.hpp \      io_object.hpp \      io_thread.hpp \      ip.hpp \ diff --git a/src/decoder.cpp b/src/decoder.cpp index bcf5974..8fc1d5e 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -22,13 +22,13 @@  #include <string.h>  #include "decoder.hpp" -#include "i_inout.hpp" +#include "i_engine.hpp"  #include "wire.hpp"  #include "err.hpp"  zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :      decoder_base_t <decoder_t> (bufsize_), -    destination (NULL), +    sink (NULL),      maxmsgsize (maxmsgsize_)  {      int rc = in_progress.init (); @@ -44,9 +44,9 @@ zmq::decoder_t::~decoder_t ()      errno_assert (rc == 0);  } -void zmq::decoder_t::set_inout (i_inout *destination_) +void zmq::decoder_t::set_sink (i_engine_sink *sink_)  { -    destination = destination_; +    sink = sink_;  }  bool zmq::decoder_t::one_byte_size_ready () @@ -136,7 +136,7 @@ bool zmq::decoder_t::message_ready ()  {      //  Message is completely read. Push it further and start reading      //  new message. (in_progress is a 0-byte message after this point.) -    if (!destination || !destination->write (&in_progress)) +    if (!sink || !sink->write (&in_progress))          return false;      next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); diff --git a/src/decoder.hpp b/src/decoder.hpp index 114ecef..17fab4b 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -184,7 +184,7 @@ namespace zmq          decoder_t (size_t bufsize_, int64_t maxmsgsize_);          ~decoder_t (); -        void set_inout (struct i_inout *destination_); +        void set_sink (struct i_engine_sink *sink_);      private: @@ -193,7 +193,7 @@ namespace zmq          bool flags_ready ();          bool message_ready (); -        struct i_inout *destination; +        struct i_engine_sink *sink;          unsigned char tmpbuf [8];          msg_t in_progress; diff --git a/src/encoder.cpp b/src/encoder.cpp index f579deb..a9be68c 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -19,12 +19,12 @@  */  #include "encoder.hpp" -#include "i_inout.hpp" +#include "i_engine.hpp"  #include "wire.hpp"  zmq::encoder_t::encoder_t (size_t bufsize_) :      encoder_base_t <encoder_t> (bufsize_), -    source (NULL) +    sink (NULL)  {      int rc = in_progress.init ();      errno_assert (rc == 0); @@ -39,9 +39,9 @@ zmq::encoder_t::~encoder_t ()      errno_assert (rc == 0);  } -void zmq::encoder_t::set_inout (i_inout *source_) +void zmq::encoder_t::set_sink (i_engine_sink *sink_)  { -    source = source_; +    sink = sink_;  }  bool zmq::encoder_t::size_ready () @@ -62,7 +62,7 @@ bool zmq::encoder_t::message_ready ()      //  Note that new state is set only if write is successful. That way      //  unsuccessful write will cause retry on the next state machine      //  invocation. -    if (!source || !source->read (&in_progress)) { +    if (!sink || !sink->read (&in_progress)) {          rc = in_progress.init ();          errno_assert (rc == 0);          return false; diff --git a/src/encoder.hpp b/src/encoder.hpp index 90b5ffe..4bc22ce 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -163,14 +163,14 @@ namespace zmq          encoder_t (size_t bufsize_);          ~encoder_t (); -        void set_inout (struct i_inout *source_); +        void set_sink (struct i_engine_sink *sink_);      private:          bool size_ready ();          bool message_ready (); -        struct i_inout *source; +        struct i_engine_sink *sink;          msg_t in_progress;          unsigned char tmpbuf [10]; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 7bd4ea6..636985f 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -24,13 +24,15 @@  namespace zmq  { +    //  Abstract interface to be implemented by various engines. +      struct i_engine      {          virtual ~i_engine () {}          //  Plug the engine to the session.          virtual void plug (class io_thread_t *io_thread_, -            struct i_inout *inout_) = 0; +            struct i_engine_sink *sink_) = 0;          //  Unplug the engine from the session.          virtual void unplug () = 0; @@ -48,6 +50,25 @@ namespace zmq          virtual void activate_out () = 0;      }; +    //  Abstract interface to be implemented by engine sinks such as sessions. + +    struct i_engine_sink +    { +        virtual ~i_engine_sink () {} + +        //  Engine asks for a message to send to the network. +        virtual bool read (class msg_t *msg_) = 0; + +        //  Engine received message from the network and sends it further on. +        virtual bool write (class msg_t *msg_) = 0; + +        //  Flush all the previously written messages. +        virtual void flush () = 0; + +        //  Engine is dead. Drop all the references to it. +        virtual void detach () = 0; +    }; +  }  #endif diff --git a/src/i_inout.hpp b/src/i_inout.hpp deleted file mode 100644 index 3f8e8e0..0000000 --- a/src/i_inout.hpp +++ /dev/null @@ -1,49 +0,0 @@ -/* -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__ -#define __ZMQ_I_INOUT_HPP_INCLUDED__ - -#include "msg.hpp" -#include "stdint.hpp" - -namespace zmq -{ - -    struct i_inout -    { -        virtual ~i_inout () {} - -        //  Engine asks for a message to send to the network. -        virtual bool read (msg_t *msg_) = 0; - -        //  Engine received message from the network and sends it further on. -        virtual bool write (msg_t *msg_) = 0; - -        //  Flush all the previously written messages. -        virtual void flush () = 0; - -        //  Engine is dead. Drop all the references to it. -        virtual void detach () = 0; -    }; - -} - -#endif diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index bb79ece..1fd687a 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -32,7 +32,6 @@  #include "err.hpp"  #include "stdint.hpp"  #include "wire.hpp" -#include "i_inout.hpp"  zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,         const options_t &options_) : @@ -40,7 +39,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,      has_rx_timer (false),      pgm_socket (true, options_),      options (options_), -    inout (NULL), +    sink (NULL),      mru_decoder (NULL),      pending_bytes (0)  { @@ -57,7 +56,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)      return pgm_socket.init (udp_encapsulation_, network_);  } -void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_) +void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)  {      //  Retrieve PGM fds and start polling.      int socket_fd; @@ -68,7 +67,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)      set_pollin (pipe_handle);      set_pollin (socket_handle); -    inout = inout_; +    sink = sink_;  }  void zmq::pgm_receiver_t::unplug () @@ -91,7 +90,7 @@ void zmq::pgm_receiver_t::unplug ()      rm_fd (socket_handle);      rm_fd (pipe_handle); -    inout = NULL; +    sink = NULL;  }  void zmq::pgm_receiver_t::terminate () @@ -218,7 +217,7 @@ void zmq::pgm_receiver_t::in_event ()              it->second.decoder = new (std::nothrow) decoder_t (0,                  options.maxmsgsize);              alloc_assert (it->second.decoder); -            it->second.decoder->set_inout (inout); +            it->second.decoder->set_sink (sink);          }          mru_decoder = it->second.decoder; @@ -244,7 +243,7 @@ void zmq::pgm_receiver_t::in_event ()      }      //  Flush any messages decoder may have produced. -    inout->flush (); +    sink->flush ();  }  void zmq::pgm_receiver_t::timer_event (int token) diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 172557f..825e0c1 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -52,7 +52,7 @@ namespace zmq          int init (bool udp_encapsulation_, const char *network_);          //  i_engine interface implementation. -        void plug (class io_thread_t *io_thread_, struct i_inout *inout_); +        void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);          void unplug ();          void terminate ();          void activate_in (); @@ -100,8 +100,8 @@ namespace zmq          //  Socket options.          options_t options; -        //  Parent session. -        i_inout *inout; +        //  Associated session. +        i_engine_sink *sink;          //  Most recently used decoder.          decoder_t *mru_decoder; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 4d76433..314a0b4 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -61,7 +61,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)      return rc;  } -void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_) +void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)  {      //  Alocate 2 fds for PGM socket.      int downlink_socket_fd = 0; @@ -69,7 +69,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)      int rdata_notify_fd = 0;      int pending_notify_fd = 0; -    encoder.set_inout (inout_); +    encoder.set_sink (sink_);      //  Fill fds from PGM transport and add them to the poller.      pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, @@ -106,7 +106,7 @@ void zmq::pgm_sender_t::unplug ()      rm_fd (uplink_handle);      rm_fd (rdata_notify_handle);      rm_fd (pending_notify_handle); -    encoder.set_inout (NULL); +    encoder.set_sink (NULL);  }  void zmq::pgm_sender_t::terminate () diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 4a21b3f..c29dd12 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -50,7 +50,7 @@ namespace zmq          int init (bool udp_encapsulation_, const char *network_);          //  i_engine interface implementation. -        void plug (class io_thread_t *io_thread_, struct i_inout *inout_); +        void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);          void unplug ();          void terminate ();          void activate_in (); diff --git a/src/session.hpp b/src/session.hpp index 1e32722..4c17930 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -22,7 +22,7 @@  #define __ZMQ_SESSION_HPP_INCLUDED__  #include "own.hpp" -#include "i_inout.hpp" +#include "i_engine.hpp"  #include "io_object.hpp"  #include "blob.hpp"  #include "pipe.hpp" @@ -33,7 +33,7 @@ namespace zmq      class session_t :          public own_t,          public io_object_t, -        public i_inout, +        public i_engine_sink,          public i_pipe_events      {      public: @@ -44,7 +44,7 @@ namespace zmq          //  To be used once only, when creating the session.          void attach_pipe (class pipe_t *pipe_); -        //  i_inout interface implementation. +        //  i_engine_sink interface implementation.          bool read (msg_t *msg_);          bool write (msg_t *msg_);          void flush (); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 725ba96..b0a7df1 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -29,7 +29,6 @@  #include "zmq_engine.hpp"  #include "zmq_connecter.hpp"  #include "io_thread.hpp" -#include "i_inout.hpp"  #include "config.hpp"  #include "err.hpp" @@ -40,8 +39,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :      outpos (NULL),      outsize (0),      encoder (out_batch_size), -    inout (NULL), -    ephemeral_inout (NULL), +    sink (NULL), +    ephemeral_sink (NULL),      options (options_),      plugged (false)  { @@ -55,18 +54,18 @@ zmq::zmq_engine_t::~zmq_engine_t ()      zmq_assert (!plugged);  } -void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) +void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)  {      zmq_assert (!plugged);      plugged = true; -    ephemeral_inout = NULL; +    ephemeral_sink = NULL;      //  Connect to session/init object. -    zmq_assert (!inout); -    zmq_assert (inout_); -    encoder.set_inout (inout_); -    decoder.set_inout (inout_); -    inout = inout_; +    zmq_assert (!sink); +    zmq_assert (sink_); +    encoder.set_sink (sink_); +    decoder.set_sink (sink_); +    sink = sink_;      //  Connect to I/O threads poller object.      io_object_t::plug (io_thread_); @@ -90,10 +89,10 @@ void zmq::zmq_engine_t::unplug ()      io_object_t::unplug ();      //  Disconnect from init/session object. -    encoder.set_inout (NULL); -    decoder.set_inout (NULL); -    ephemeral_inout = inout; -    inout = NULL; +    encoder.set_sink (NULL); +    decoder.set_sink (NULL); +    ephemeral_sink = sink; +    sink = NULL;  }  void zmq::zmq_engine_t::terminate () @@ -149,13 +148,13 @@ void zmq::zmq_engine_t::in_event ()      //  Flush all messages the decoder may have produced.      //  If IO handler has unplugged engine, flush transient IO handler.      if (unlikely (!plugged)) { -        zmq_assert (ephemeral_inout); -        ephemeral_inout->flush (); +        zmq_assert (ephemeral_sink); +        ephemeral_sink->flush ();      } else { -        inout->flush (); +        sink->flush ();      } -    if (inout && disconnection) +    if (sink && disconnection)          error ();  } @@ -169,8 +168,8 @@ void zmq::zmq_engine_t::out_event ()          //  If IO handler has unplugged engine, flush transient IO handler.          if (unlikely (!plugged)) { -            zmq_assert (ephemeral_inout); -            ephemeral_inout->flush (); +            zmq_assert (ephemeral_sink); +            ephemeral_sink->flush ();              return;          } @@ -219,8 +218,8 @@ void zmq::zmq_engine_t::activate_in ()  void zmq::zmq_engine_t::error ()  { -    zmq_assert (inout); -    inout->detach (); +    zmq_assert (sink); +    sink->detach ();      unplug ();      delete this;  } diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 47073cc..7f09775 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -43,7 +43,7 @@ namespace zmq          ~zmq_engine_t ();          //  i_engine interface implementation. -        void plug (class io_thread_t *io_thread_, struct i_inout *inout_); +        void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);          void unplug ();          void terminate ();          void activate_in (); @@ -69,10 +69,10 @@ namespace zmq          size_t outsize;          encoder_t encoder; -        i_inout *inout; +        i_engine_sink *sink; -        //  Detached transient inout handler. -        i_inout *ephemeral_inout; +        //  Detached transient sink. +        i_engine_sink *ephemeral_sink;          options_t options; diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index ec9b2b3..3324f5e 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -23,7 +23,6 @@  #include <vector> -#include "i_inout.hpp"  #include "i_engine.hpp"  #include "stdint.hpp"  #include "blob.hpp" @@ -36,7 +35,9 @@ namespace zmq      //  The class handles initialisation phase of 0MQ wire-level protocol. -    class zmq_init_t : public own_t, public i_inout +    class zmq_init_t : +        public own_t, +        public i_engine_sink      {      public: @@ -56,7 +57,7 @@ namespace zmq          void finalise_initialisation ();          void dispatch_engine (); -        //  i_inout interface implementation. +        //  i_engine_sink interface implementation.          bool read (class msg_t *msg_);          bool write (class msg_t *msg_);          void flush (); | 
