diff options
Diffstat (limited to 'src/zmq_engine.cpp')
-rw-r--r-- | src/zmq_engine.cpp | 43 |
1 files changed, 21 insertions, 22 deletions
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 725ba96..b0a7df1 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -29,7 +29,6 @@ #include "zmq_engine.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" -#include "i_inout.hpp" #include "config.hpp" #include "err.hpp" @@ -40,8 +39,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : outpos (NULL), outsize (0), encoder (out_batch_size), - inout (NULL), - ephemeral_inout (NULL), + sink (NULL), + ephemeral_sink (NULL), options (options_), plugged (false) { @@ -55,18 +54,18 @@ zmq::zmq_engine_t::~zmq_engine_t () zmq_assert (!plugged); } -void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) +void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) { zmq_assert (!plugged); plugged = true; - ephemeral_inout = NULL; + ephemeral_sink = NULL; // Connect to session/init object. - zmq_assert (!inout); - zmq_assert (inout_); - encoder.set_inout (inout_); - decoder.set_inout (inout_); - inout = inout_; + zmq_assert (!sink); + zmq_assert (sink_); + encoder.set_sink (sink_); + decoder.set_sink (sink_); + sink = sink_; // Connect to I/O threads poller object. io_object_t::plug (io_thread_); @@ -90,10 +89,10 @@ void zmq::zmq_engine_t::unplug () io_object_t::unplug (); // Disconnect from init/session object. - encoder.set_inout (NULL); - decoder.set_inout (NULL); - ephemeral_inout = inout; - inout = NULL; + encoder.set_sink (NULL); + decoder.set_sink (NULL); + ephemeral_sink = sink; + sink = NULL; } void zmq::zmq_engine_t::terminate () @@ -149,13 +148,13 @@ void zmq::zmq_engine_t::in_event () // Flush all messages the decoder may have produced. // If IO handler has unplugged engine, flush transient IO handler. if (unlikely (!plugged)) { - zmq_assert (ephemeral_inout); - ephemeral_inout->flush (); + zmq_assert (ephemeral_sink); + ephemeral_sink->flush (); } else { - inout->flush (); + sink->flush (); } - if (inout && disconnection) + if (sink && disconnection) error (); } @@ -169,8 +168,8 @@ void zmq::zmq_engine_t::out_event () // If IO handler has unplugged engine, flush transient IO handler. if (unlikely (!plugged)) { - zmq_assert (ephemeral_inout); - ephemeral_inout->flush (); + zmq_assert (ephemeral_sink); + ephemeral_sink->flush (); return; } @@ -219,8 +218,8 @@ void zmq::zmq_engine_t::activate_in () void zmq::zmq_engine_t::error () { - zmq_assert (inout); - inout->detach (); + zmq_assert (sink); + sink->detach (); unplug (); delete this; } |