From d13933bc62fce71b5a58118020e0dd3776e79aa9 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 11 Aug 2010 14:09:56 +0200 Subject: I/O object hierarchy implemented --- src/zmq_engine.cpp | 52 +++++++++++++++++++++------------------------------- 1 file changed, 21 insertions(+), 31 deletions(-) (limited to 'src/zmq_engine.cpp') diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 41b10c8..de26b27 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -32,10 +32,7 @@ #include "config.hpp" #include "err.hpp" -zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, - const options_t &options_, bool reconnect_, - const char *protocol_, const char *address_) : - io_object_t (parent_), +zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : inpos (NULL), insize (0), decoder (in_batch_size), @@ -43,14 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, outsize (0), encoder (out_batch_size), inout (NULL), - options (options_), - reconnect (reconnect_) + options (options_) { - if (reconnect) { - protocol = protocol_; - address = address_; - } - // Initialise the underlying socket. int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); zmq_assert (rc == 0); @@ -60,26 +51,37 @@ zmq::zmq_engine_t::~zmq_engine_t () { } -void zmq::zmq_engine_t::plug (i_inout *inout_) +void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) { + // Conncet to session/init object. zmq_assert (!inout); - + zmq_assert (inout_); encoder.set_inout (inout_); decoder.set_inout (inout_); + inout = inout_; + + // Connect to I/O threads poller object. + io_object_t::plug (io_thread_); handle = add_fd (tcp_socket.get_fd ()); set_pollin (handle); set_pollout (handle); - inout = inout_; - // Flush all the data that may have been already received downstream. in_event (); + + // TODO: Re-plug to the new I/O thread & poller! } void zmq::zmq_engine_t::unplug () { + // Cancel all fd subscriptions. rm_fd (handle); + + // Disconnect from I/O threads poller object. + io_object_t::unplug (); + + // Disconnect from init/session object. encoder.set_inout (NULL); decoder.set_inout (NULL); inout = NULL; @@ -155,7 +157,7 @@ void zmq::zmq_engine_t::out_event () outsize -= nbytes; } -void zmq::zmq_engine_t::revive () +void zmq::zmq_engine_t::activate_out () { set_pollout (handle); @@ -166,30 +168,18 @@ void zmq::zmq_engine_t::revive () out_event (); } -void zmq::zmq_engine_t::resume_input () +void zmq::zmq_engine_t::activate_in () { set_pollin (handle); + // Speculative read. in_event (); } void zmq::zmq_engine_t::error () { zmq_assert (inout); - - zmq_connecter_t *reconnecter = NULL; - if (reconnect) { - - // Create a connecter object to attempt reconnect. - // Ask it to wait for a while before reconnecting. - reconnecter = new (std::nothrow) zmq_connecter_t ( - inout->get_io_thread (), inout->get_owner (), - options, inout->get_ordinal (), true); - zmq_assert (reconnecter); - reconnecter->set_address (protocol.c_str(), address.c_str ()); - } - - inout->detach (reconnecter); + inout->detach (); unplug (); delete this; } -- cgit v1.2.3