diff options
-rw-r--r-- | src/Makefile.am | 1 | ||||
-rw-r--r-- | src/decoder.cpp | 10 | ||||
-rw-r--r-- | src/decoder.hpp | 4 | ||||
-rw-r--r-- | src/encoder.cpp | 10 | ||||
-rw-r--r-- | src/encoder.hpp | 4 | ||||
-rw-r--r-- | src/i_engine.hpp | 23 | ||||
-rw-r--r-- | src/i_inout.hpp | 49 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 13 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 6 | ||||
-rw-r--r-- | src/pgm_sender.cpp | 6 | ||||
-rw-r--r-- | src/pgm_sender.hpp | 2 | ||||
-rw-r--r-- | src/session.hpp | 6 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 43 | ||||
-rw-r--r-- | src/zmq_engine.hpp | 8 | ||||
-rw-r--r-- | src/zmq_init.hpp | 7 |
15 files changed, 81 insertions, 111 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index ae20d33..45e8ac0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -24,7 +24,6 @@ libzmq_la_SOURCES = \ err.hpp \ fd.hpp \ fq.hpp \ - i_inout.hpp \ io_object.hpp \ io_thread.hpp \ ip.hpp \ diff --git a/src/decoder.cpp b/src/decoder.cpp index bcf5974..8fc1d5e 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -22,13 +22,13 @@ #include <string.h> #include "decoder.hpp" -#include "i_inout.hpp" +#include "i_engine.hpp" #include "wire.hpp" #include "err.hpp" zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : decoder_base_t <decoder_t> (bufsize_), - destination (NULL), + sink (NULL), maxmsgsize (maxmsgsize_) { int rc = in_progress.init (); @@ -44,9 +44,9 @@ zmq::decoder_t::~decoder_t () errno_assert (rc == 0); } -void zmq::decoder_t::set_inout (i_inout *destination_) +void zmq::decoder_t::set_sink (i_engine_sink *sink_) { - destination = destination_; + sink = sink_; } bool zmq::decoder_t::one_byte_size_ready () @@ -136,7 +136,7 @@ bool zmq::decoder_t::message_ready () { // Message is completely read. Push it further and start reading // new message. (in_progress is a 0-byte message after this point.) - if (!destination || !destination->write (&in_progress)) + if (!sink || !sink->write (&in_progress)) return false; next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); diff --git a/src/decoder.hpp b/src/decoder.hpp index 114ecef..17fab4b 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -184,7 +184,7 @@ namespace zmq decoder_t (size_t bufsize_, int64_t maxmsgsize_); ~decoder_t (); - void set_inout (struct i_inout *destination_); + void set_sink (struct i_engine_sink *sink_); private: @@ -193,7 +193,7 @@ namespace zmq bool flags_ready (); bool message_ready (); - struct i_inout *destination; + struct i_engine_sink *sink; unsigned char tmpbuf [8]; msg_t in_progress; diff --git a/src/encoder.cpp b/src/encoder.cpp index f579deb..a9be68c 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -19,12 +19,12 @@ */ #include "encoder.hpp" -#include "i_inout.hpp" +#include "i_engine.hpp" #include "wire.hpp" zmq::encoder_t::encoder_t (size_t bufsize_) : encoder_base_t <encoder_t> (bufsize_), - source (NULL) + sink (NULL) { int rc = in_progress.init (); errno_assert (rc == 0); @@ -39,9 +39,9 @@ zmq::encoder_t::~encoder_t () errno_assert (rc == 0); } -void zmq::encoder_t::set_inout (i_inout *source_) +void zmq::encoder_t::set_sink (i_engine_sink *sink_) { - source = source_; + sink = sink_; } bool zmq::encoder_t::size_ready () @@ -62,7 +62,7 @@ bool zmq::encoder_t::message_ready () // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. - if (!source || !source->read (&in_progress)) { + if (!sink || !sink->read (&in_progress)) { rc = in_progress.init (); errno_assert (rc == 0); return false; diff --git a/src/encoder.hpp b/src/encoder.hpp index 90b5ffe..4bc22ce 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -163,14 +163,14 @@ namespace zmq encoder_t (size_t bufsize_); ~encoder_t (); - void set_inout (struct i_inout *source_); + void set_sink (struct i_engine_sink *sink_); private: bool size_ready (); bool message_ready (); - struct i_inout *source; + struct i_engine_sink *sink; msg_t in_progress; unsigned char tmpbuf [10]; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 7bd4ea6..636985f 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -24,13 +24,15 @@ namespace zmq { + // Abstract interface to be implemented by various engines. + struct i_engine { virtual ~i_engine () {} // Plug the engine to the session. virtual void plug (class io_thread_t *io_thread_, - struct i_inout *inout_) = 0; + struct i_engine_sink *sink_) = 0; // Unplug the engine from the session. virtual void unplug () = 0; @@ -48,6 +50,25 @@ namespace zmq virtual void activate_out () = 0; }; + // Abstract interface to be implemented by engine sinks such as sessions. + + struct i_engine_sink + { + virtual ~i_engine_sink () {} + + // Engine asks for a message to send to the network. + virtual bool read (class msg_t *msg_) = 0; + + // Engine received message from the network and sends it further on. + virtual bool write (class msg_t *msg_) = 0; + + // Flush all the previously written messages. + virtual void flush () = 0; + + // Engine is dead. Drop all the references to it. + virtual void detach () = 0; + }; + } #endif diff --git a/src/i_inout.hpp b/src/i_inout.hpp deleted file mode 100644 index 3f8e8e0..0000000 --- a/src/i_inout.hpp +++ /dev/null @@ -1,49 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__ -#define __ZMQ_I_INOUT_HPP_INCLUDED__ - -#include "msg.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - struct i_inout - { - virtual ~i_inout () {} - - // Engine asks for a message to send to the network. - virtual bool read (msg_t *msg_) = 0; - - // Engine received message from the network and sends it further on. - virtual bool write (msg_t *msg_) = 0; - - // Flush all the previously written messages. - virtual void flush () = 0; - - // Engine is dead. Drop all the references to it. - virtual void detach () = 0; - }; - -} - -#endif diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index bb79ece..1fd687a 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -32,7 +32,6 @@ #include "err.hpp" #include "stdint.hpp" #include "wire.hpp" -#include "i_inout.hpp" zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, const options_t &options_) : @@ -40,7 +39,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, has_rx_timer (false), pgm_socket (true, options_), options (options_), - inout (NULL), + sink (NULL), mru_decoder (NULL), pending_bytes (0) { @@ -57,7 +56,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) return pgm_socket.init (udp_encapsulation_, network_); } -void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_) +void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) { // Retrieve PGM fds and start polling. int socket_fd; @@ -68,7 +67,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_) set_pollin (pipe_handle); set_pollin (socket_handle); - inout = inout_; + sink = sink_; } void zmq::pgm_receiver_t::unplug () @@ -91,7 +90,7 @@ void zmq::pgm_receiver_t::unplug () rm_fd (socket_handle); rm_fd (pipe_handle); - inout = NULL; + sink = NULL; } void zmq::pgm_receiver_t::terminate () @@ -218,7 +217,7 @@ void zmq::pgm_receiver_t::in_event () it->second.decoder = new (std::nothrow) decoder_t (0, options.maxmsgsize); alloc_assert (it->second.decoder); - it->second.decoder->set_inout (inout); + it->second.decoder->set_sink (sink); } mru_decoder = it->second.decoder; @@ -244,7 +243,7 @@ void zmq::pgm_receiver_t::in_event () } // Flush any messages decoder may have produced. - inout->flush (); + sink->flush (); } void zmq::pgm_receiver_t::timer_event (int token) diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 172557f..825e0c1 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -52,7 +52,7 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, struct i_inout *inout_); + void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_); void unplug (); void terminate (); void activate_in (); @@ -100,8 +100,8 @@ namespace zmq // Socket options. options_t options; - // Parent session. - i_inout *inout; + // Associated session. + i_engine_sink *sink; // Most recently used decoder. decoder_t *mru_decoder; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 4d76433..314a0b4 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -61,7 +61,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) return rc; } -void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_) +void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) { // Alocate 2 fds for PGM socket. int downlink_socket_fd = 0; @@ -69,7 +69,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_) int rdata_notify_fd = 0; int pending_notify_fd = 0; - encoder.set_inout (inout_); + encoder.set_sink (sink_); // Fill fds from PGM transport and add them to the poller. pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, @@ -106,7 +106,7 @@ void zmq::pgm_sender_t::unplug () rm_fd (uplink_handle); rm_fd (rdata_notify_handle); rm_fd (pending_notify_handle); - encoder.set_inout (NULL); + encoder.set_sink (NULL); } void zmq::pgm_sender_t::terminate () diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 4a21b3f..c29dd12 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -50,7 +50,7 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, struct i_inout *inout_); + void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_); void unplug (); void terminate (); void activate_in (); diff --git a/src/session.hpp b/src/session.hpp index 1e32722..4c17930 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -22,7 +22,7 @@ #define __ZMQ_SESSION_HPP_INCLUDED__ #include "own.hpp" -#include "i_inout.hpp" +#include "i_engine.hpp" #include "io_object.hpp" #include "blob.hpp" #include "pipe.hpp" @@ -33,7 +33,7 @@ namespace zmq class session_t : public own_t, public io_object_t, - public i_inout, + public i_engine_sink, public i_pipe_events { public: @@ -44,7 +44,7 @@ namespace zmq // To be used once only, when creating the session. void attach_pipe (class pipe_t *pipe_); - // i_inout interface implementation. + // i_engine_sink interface implementation. bool read (msg_t *msg_); bool write (msg_t *msg_); void flush (); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 725ba96..b0a7df1 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -29,7 +29,6 @@ #include "zmq_engine.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" -#include "i_inout.hpp" #include "config.hpp" #include "err.hpp" @@ -40,8 +39,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : outpos (NULL), outsize (0), encoder (out_batch_size), - inout (NULL), - ephemeral_inout (NULL), + sink (NULL), + ephemeral_sink (NULL), options (options_), plugged (false) { @@ -55,18 +54,18 @@ zmq::zmq_engine_t::~zmq_engine_t () zmq_assert (!plugged); } -void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) +void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) { zmq_assert (!plugged); plugged = true; - ephemeral_inout = NULL; + ephemeral_sink = NULL; // Connect to session/init object. - zmq_assert (!inout); - zmq_assert (inout_); - encoder.set_inout (inout_); - decoder.set_inout (inout_); - inout = inout_; + zmq_assert (!sink); + zmq_assert (sink_); + encoder.set_sink (sink_); + decoder.set_sink (sink_); + sink = sink_; // Connect to I/O threads poller object. io_object_t::plug (io_thread_); @@ -90,10 +89,10 @@ void zmq::zmq_engine_t::unplug () io_object_t::unplug (); // Disconnect from init/session object. - encoder.set_inout (NULL); - decoder.set_inout (NULL); - ephemeral_inout = inout; - inout = NULL; + encoder.set_sink (NULL); + decoder.set_sink (NULL); + ephemeral_sink = sink; + sink = NULL; } void zmq::zmq_engine_t::terminate () @@ -149,13 +148,13 @@ void zmq::zmq_engine_t::in_event () // Flush all messages the decoder may have produced. // If IO handler has unplugged engine, flush transient IO handler. if (unlikely (!plugged)) { - zmq_assert (ephemeral_inout); - ephemeral_inout->flush (); + zmq_assert (ephemeral_sink); + ephemeral_sink->flush (); } else { - inout->flush (); + sink->flush (); } - if (inout && disconnection) + if (sink && disconnection) error (); } @@ -169,8 +168,8 @@ void zmq::zmq_engine_t::out_event () // If IO handler has unplugged engine, flush transient IO handler. if (unlikely (!plugged)) { - zmq_assert (ephemeral_inout); - ephemeral_inout->flush (); + zmq_assert (ephemeral_sink); + ephemeral_sink->flush (); return; } @@ -219,8 +218,8 @@ void zmq::zmq_engine_t::activate_in () void zmq::zmq_engine_t::error () { - zmq_assert (inout); - inout->detach (); + zmq_assert (sink); + sink->detach (); unplug (); delete this; } diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 47073cc..7f09775 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -43,7 +43,7 @@ namespace zmq ~zmq_engine_t (); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, struct i_inout *inout_); + void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_); void unplug (); void terminate (); void activate_in (); @@ -69,10 +69,10 @@ namespace zmq size_t outsize; encoder_t encoder; - i_inout *inout; + i_engine_sink *sink; - // Detached transient inout handler. - i_inout *ephemeral_inout; + // Detached transient sink. + i_engine_sink *ephemeral_sink; options_t options; diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index ec9b2b3..3324f5e 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -23,7 +23,6 @@ #include <vector> -#include "i_inout.hpp" #include "i_engine.hpp" #include "stdint.hpp" #include "blob.hpp" @@ -36,7 +35,9 @@ namespace zmq // The class handles initialisation phase of 0MQ wire-level protocol. - class zmq_init_t : public own_t, public i_inout + class zmq_init_t : + public own_t, + public i_engine_sink { public: @@ -56,7 +57,7 @@ namespace zmq void finalise_initialisation (); void dispatch_engine (); - // i_inout interface implementation. + // i_engine_sink interface implementation. bool read (class msg_t *msg_); bool write (class msg_t *msg_); void flush (); |