summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile.am1
-rw-r--r--src/decoder.cpp10
-rw-r--r--src/decoder.hpp4
-rw-r--r--src/encoder.cpp10
-rw-r--r--src/encoder.hpp4
-rw-r--r--src/i_engine.hpp23
-rw-r--r--src/i_inout.hpp49
-rw-r--r--src/pgm_receiver.cpp13
-rw-r--r--src/pgm_receiver.hpp6
-rw-r--r--src/pgm_sender.cpp6
-rw-r--r--src/pgm_sender.hpp2
-rw-r--r--src/session.hpp6
-rw-r--r--src/zmq_engine.cpp43
-rw-r--r--src/zmq_engine.hpp8
-rw-r--r--src/zmq_init.hpp7
15 files changed, 81 insertions, 111 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index ae20d33..45e8ac0 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -24,7 +24,6 @@ libzmq_la_SOURCES = \
err.hpp \
fd.hpp \
fq.hpp \
- i_inout.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
diff --git a/src/decoder.cpp b/src/decoder.cpp
index bcf5974..8fc1d5e 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -22,13 +22,13 @@
#include <string.h>
#include "decoder.hpp"
-#include "i_inout.hpp"
+#include "i_engine.hpp"
#include "wire.hpp"
#include "err.hpp"
zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
decoder_base_t <decoder_t> (bufsize_),
- destination (NULL),
+ sink (NULL),
maxmsgsize (maxmsgsize_)
{
int rc = in_progress.init ();
@@ -44,9 +44,9 @@ zmq::decoder_t::~decoder_t ()
errno_assert (rc == 0);
}
-void zmq::decoder_t::set_inout (i_inout *destination_)
+void zmq::decoder_t::set_sink (i_engine_sink *sink_)
{
- destination = destination_;
+ sink = sink_;
}
bool zmq::decoder_t::one_byte_size_ready ()
@@ -136,7 +136,7 @@ bool zmq::decoder_t::message_ready ()
{
// Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.)
- if (!destination || !destination->write (&in_progress))
+ if (!sink || !sink->write (&in_progress))
return false;
next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 114ecef..17fab4b 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -184,7 +184,7 @@ namespace zmq
decoder_t (size_t bufsize_, int64_t maxmsgsize_);
~decoder_t ();
- void set_inout (struct i_inout *destination_);
+ void set_sink (struct i_engine_sink *sink_);
private:
@@ -193,7 +193,7 @@ namespace zmq
bool flags_ready ();
bool message_ready ();
- struct i_inout *destination;
+ struct i_engine_sink *sink;
unsigned char tmpbuf [8];
msg_t in_progress;
diff --git a/src/encoder.cpp b/src/encoder.cpp
index f579deb..a9be68c 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -19,12 +19,12 @@
*/
#include "encoder.hpp"
-#include "i_inout.hpp"
+#include "i_engine.hpp"
#include "wire.hpp"
zmq::encoder_t::encoder_t (size_t bufsize_) :
encoder_base_t <encoder_t> (bufsize_),
- source (NULL)
+ sink (NULL)
{
int rc = in_progress.init ();
errno_assert (rc == 0);
@@ -39,9 +39,9 @@ zmq::encoder_t::~encoder_t ()
errno_assert (rc == 0);
}
-void zmq::encoder_t::set_inout (i_inout *source_)
+void zmq::encoder_t::set_sink (i_engine_sink *sink_)
{
- source = source_;
+ sink = sink_;
}
bool zmq::encoder_t::size_ready ()
@@ -62,7 +62,7 @@ bool zmq::encoder_t::message_ready ()
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
- if (!source || !source->read (&in_progress)) {
+ if (!sink || !sink->read (&in_progress)) {
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
diff --git a/src/encoder.hpp b/src/encoder.hpp
index 90b5ffe..4bc22ce 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -163,14 +163,14 @@ namespace zmq
encoder_t (size_t bufsize_);
~encoder_t ();
- void set_inout (struct i_inout *source_);
+ void set_sink (struct i_engine_sink *sink_);
private:
bool size_ready ();
bool message_ready ();
- struct i_inout *source;
+ struct i_engine_sink *sink;
msg_t in_progress;
unsigned char tmpbuf [10];
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 7bd4ea6..636985f 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -24,13 +24,15 @@
namespace zmq
{
+ // Abstract interface to be implemented by various engines.
+
struct i_engine
{
virtual ~i_engine () {}
// Plug the engine to the session.
virtual void plug (class io_thread_t *io_thread_,
- struct i_inout *inout_) = 0;
+ struct i_engine_sink *sink_) = 0;
// Unplug the engine from the session.
virtual void unplug () = 0;
@@ -48,6 +50,25 @@ namespace zmq
virtual void activate_out () = 0;
};
+ // Abstract interface to be implemented by engine sinks such as sessions.
+
+ struct i_engine_sink
+ {
+ virtual ~i_engine_sink () {}
+
+ // Engine asks for a message to send to the network.
+ virtual bool read (class msg_t *msg_) = 0;
+
+ // Engine received message from the network and sends it further on.
+ virtual bool write (class msg_t *msg_) = 0;
+
+ // Flush all the previously written messages.
+ virtual void flush () = 0;
+
+ // Engine is dead. Drop all the references to it.
+ virtual void detach () = 0;
+ };
+
}
#endif
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
deleted file mode 100644
index 3f8e8e0..0000000
--- a/src/i_inout.hpp
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__
-#define __ZMQ_I_INOUT_HPP_INCLUDED__
-
-#include "msg.hpp"
-#include "stdint.hpp"
-
-namespace zmq
-{
-
- struct i_inout
- {
- virtual ~i_inout () {}
-
- // Engine asks for a message to send to the network.
- virtual bool read (msg_t *msg_) = 0;
-
- // Engine received message from the network and sends it further on.
- virtual bool write (msg_t *msg_) = 0;
-
- // Flush all the previously written messages.
- virtual void flush () = 0;
-
- // Engine is dead. Drop all the references to it.
- virtual void detach () = 0;
- };
-
-}
-
-#endif
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index bb79ece..1fd687a 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -32,7 +32,6 @@
#include "err.hpp"
#include "stdint.hpp"
#include "wire.hpp"
-#include "i_inout.hpp"
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_) :
@@ -40,7 +39,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
has_rx_timer (false),
pgm_socket (true, options_),
options (options_),
- inout (NULL),
+ sink (NULL),
mru_decoder (NULL),
pending_bytes (0)
{
@@ -57,7 +56,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_);
}
-void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)
+void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
{
// Retrieve PGM fds and start polling.
int socket_fd;
@@ -68,7 +67,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)
set_pollin (pipe_handle);
set_pollin (socket_handle);
- inout = inout_;
+ sink = sink_;
}
void zmq::pgm_receiver_t::unplug ()
@@ -91,7 +90,7 @@ void zmq::pgm_receiver_t::unplug ()
rm_fd (socket_handle);
rm_fd (pipe_handle);
- inout = NULL;
+ sink = NULL;
}
void zmq::pgm_receiver_t::terminate ()
@@ -218,7 +217,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.decoder = new (std::nothrow) decoder_t (0,
options.maxmsgsize);
alloc_assert (it->second.decoder);
- it->second.decoder->set_inout (inout);
+ it->second.decoder->set_sink (sink);
}
mru_decoder = it->second.decoder;
@@ -244,7 +243,7 @@ void zmq::pgm_receiver_t::in_event ()
}
// Flush any messages decoder may have produced.
- inout->flush ();
+ sink->flush ();
}
void zmq::pgm_receiver_t::timer_event (int token)
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 172557f..825e0c1 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -52,7 +52,7 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
- void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
+ void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);
void unplug ();
void terminate ();
void activate_in ();
@@ -100,8 +100,8 @@ namespace zmq
// Socket options.
options_t options;
- // Parent session.
- i_inout *inout;
+ // Associated session.
+ i_engine_sink *sink;
// Most recently used decoder.
decoder_t *mru_decoder;
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 4d76433..314a0b4 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -61,7 +61,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
return rc;
}
-void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)
+void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
{
// Alocate 2 fds for PGM socket.
int downlink_socket_fd = 0;
@@ -69,7 +69,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)
int rdata_notify_fd = 0;
int pending_notify_fd = 0;
- encoder.set_inout (inout_);
+ encoder.set_sink (sink_);
// Fill fds from PGM transport and add them to the poller.
pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
@@ -106,7 +106,7 @@ void zmq::pgm_sender_t::unplug ()
rm_fd (uplink_handle);
rm_fd (rdata_notify_handle);
rm_fd (pending_notify_handle);
- encoder.set_inout (NULL);
+ encoder.set_sink (NULL);
}
void zmq::pgm_sender_t::terminate ()
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 4a21b3f..c29dd12 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -50,7 +50,7 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
- void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
+ void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);
void unplug ();
void terminate ();
void activate_in ();
diff --git a/src/session.hpp b/src/session.hpp
index 1e32722..4c17930 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -22,7 +22,7 @@
#define __ZMQ_SESSION_HPP_INCLUDED__
#include "own.hpp"
-#include "i_inout.hpp"
+#include "i_engine.hpp"
#include "io_object.hpp"
#include "blob.hpp"
#include "pipe.hpp"
@@ -33,7 +33,7 @@ namespace zmq
class session_t :
public own_t,
public io_object_t,
- public i_inout,
+ public i_engine_sink,
public i_pipe_events
{
public:
@@ -44,7 +44,7 @@ namespace zmq
// To be used once only, when creating the session.
void attach_pipe (class pipe_t *pipe_);
- // i_inout interface implementation.
+ // i_engine_sink interface implementation.
bool read (msg_t *msg_);
bool write (msg_t *msg_);
void flush ();
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 725ba96..b0a7df1 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -29,7 +29,6 @@
#include "zmq_engine.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
-#include "i_inout.hpp"
#include "config.hpp"
#include "err.hpp"
@@ -40,8 +39,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
outpos (NULL),
outsize (0),
encoder (out_batch_size),
- inout (NULL),
- ephemeral_inout (NULL),
+ sink (NULL),
+ ephemeral_sink (NULL),
options (options_),
plugged (false)
{
@@ -55,18 +54,18 @@ zmq::zmq_engine_t::~zmq_engine_t ()
zmq_assert (!plugged);
}
-void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_)
+void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
{
zmq_assert (!plugged);
plugged = true;
- ephemeral_inout = NULL;
+ ephemeral_sink = NULL;
// Connect to session/init object.
- zmq_assert (!inout);
- zmq_assert (inout_);
- encoder.set_inout (inout_);
- decoder.set_inout (inout_);
- inout = inout_;
+ zmq_assert (!sink);
+ zmq_assert (sink_);
+ encoder.set_sink (sink_);
+ decoder.set_sink (sink_);
+ sink = sink_;
// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
@@ -90,10 +89,10 @@ void zmq::zmq_engine_t::unplug ()
io_object_t::unplug ();
// Disconnect from init/session object.
- encoder.set_inout (NULL);
- decoder.set_inout (NULL);
- ephemeral_inout = inout;
- inout = NULL;
+ encoder.set_sink (NULL);
+ decoder.set_sink (NULL);
+ ephemeral_sink = sink;
+ sink = NULL;
}
void zmq::zmq_engine_t::terminate ()
@@ -149,13 +148,13 @@ void zmq::zmq_engine_t::in_event ()
// Flush all messages the decoder may have produced.
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
- zmq_assert (ephemeral_inout);
- ephemeral_inout->flush ();
+ zmq_assert (ephemeral_sink);
+ ephemeral_sink->flush ();
} else {
- inout->flush ();
+ sink->flush ();
}
- if (inout && disconnection)
+ if (sink && disconnection)
error ();
}
@@ -169,8 +168,8 @@ void zmq::zmq_engine_t::out_event ()
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
- zmq_assert (ephemeral_inout);
- ephemeral_inout->flush ();
+ zmq_assert (ephemeral_sink);
+ ephemeral_sink->flush ();
return;
}
@@ -219,8 +218,8 @@ void zmq::zmq_engine_t::activate_in ()
void zmq::zmq_engine_t::error ()
{
- zmq_assert (inout);
- inout->detach ();
+ zmq_assert (sink);
+ sink->detach ();
unplug ();
delete this;
}
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 47073cc..7f09775 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -43,7 +43,7 @@ namespace zmq
~zmq_engine_t ();
// i_engine interface implementation.
- void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
+ void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);
void unplug ();
void terminate ();
void activate_in ();
@@ -69,10 +69,10 @@ namespace zmq
size_t outsize;
encoder_t encoder;
- i_inout *inout;
+ i_engine_sink *sink;
- // Detached transient inout handler.
- i_inout *ephemeral_inout;
+ // Detached transient sink.
+ i_engine_sink *ephemeral_sink;
options_t options;
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index ec9b2b3..3324f5e 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -23,7 +23,6 @@
#include <vector>
-#include "i_inout.hpp"
#include "i_engine.hpp"
#include "stdint.hpp"
#include "blob.hpp"
@@ -36,7 +35,9 @@ namespace zmq
// The class handles initialisation phase of 0MQ wire-level protocol.
- class zmq_init_t : public own_t, public i_inout
+ class zmq_init_t :
+ public own_t,
+ public i_engine_sink
{
public:
@@ -56,7 +57,7 @@ namespace zmq
void finalise_initialisation ();
void dispatch_engine ();
- // i_inout interface implementation.
+ // i_engine_sink interface implementation.
bool read (class msg_t *msg_);
bool write (class msg_t *msg_);
void flush ();