diff options
Diffstat (limited to 'src/zmq_engine.cpp')
-rw-r--r-- | src/zmq_engine.cpp | 87 |
1 files changed, 47 insertions, 40 deletions
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 41b10c8..761f6fe 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -32,10 +32,7 @@ #include "config.hpp" #include "err.hpp" -zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, - const options_t &options_, bool reconnect_, - const char *protocol_, const char *address_) : - io_object_t (parent_), +zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : inpos (NULL), insize (0), decoder (in_batch_size), @@ -44,13 +41,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, encoder (out_batch_size), inout (NULL), options (options_), - reconnect (reconnect_) + plugged (false) { - if (reconnect) { - protocol = protocol_; - address = address_; - } - // Initialise the underlying socket. int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); zmq_assert (rc == 0); @@ -58,33 +50,54 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, zmq::zmq_engine_t::~zmq_engine_t () { + zmq_assert (!plugged); } -void zmq::zmq_engine_t::plug (i_inout *inout_) +void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) { - zmq_assert (!inout); + zmq_assert (!plugged); + plugged = true; + // Conncet to session/init object. + zmq_assert (!inout); + zmq_assert (inout_); encoder.set_inout (inout_); decoder.set_inout (inout_); + inout = inout_; + // Connect to I/O threads poller object. + io_object_t::plug (io_thread_); handle = add_fd (tcp_socket.get_fd ()); set_pollin (handle); set_pollout (handle); - inout = inout_; - // Flush all the data that may have been already received downstream. in_event (); } void zmq::zmq_engine_t::unplug () { + zmq_assert (plugged); + plugged = false; + + // Cancel all fd subscriptions. rm_fd (handle); + + // Disconnect from I/O threads poller object. + io_object_t::unplug (); + + // Disconnect from init/session object. encoder.set_inout (NULL); decoder.set_inout (NULL); inout = NULL; } +void zmq::zmq_engine_t::terminate () +{ + unplug (); + delete this; +} + void zmq::zmq_engine_t::in_event () { bool disconnection = false; @@ -106,18 +119,24 @@ void zmq::zmq_engine_t::in_event () // Push the data to the decoder. size_t processed = decoder.process_buffer (inpos, insize); - // Stop polling for input if we got stuck. - if (processed < insize) { - - // This may happen if queue limits are in effect or when - // init object reads all required information from the socket - // and rejects to read more data. - reset_pollin (handle); + if (unlikely (processed == (size_t) -1)) { + disconnection = true; } + else { + + // Stop polling for input if we got stuck. + if (processed < insize) { + + // This may happen if queue limits are in effect or when + // init object reads all required information from the socket + // and rejects to read more data. + reset_pollin (handle); + } - // Adjust the buffer. - inpos += processed; - insize -= processed; + // Adjust the buffer. + inpos += processed; + insize -= processed; + } // Flush all messages the decoder may have produced. inout->flush (); @@ -155,7 +174,7 @@ void zmq::zmq_engine_t::out_event () outsize -= nbytes; } -void zmq::zmq_engine_t::revive () +void zmq::zmq_engine_t::activate_out () { set_pollout (handle); @@ -166,30 +185,18 @@ void zmq::zmq_engine_t::revive () out_event (); } -void zmq::zmq_engine_t::resume_input () +void zmq::zmq_engine_t::activate_in () { set_pollin (handle); + // Speculative read. in_event (); } void zmq::zmq_engine_t::error () { zmq_assert (inout); - - zmq_connecter_t *reconnecter = NULL; - if (reconnect) { - - // Create a connecter object to attempt reconnect. - // Ask it to wait for a while before reconnecting. - reconnecter = new (std::nothrow) zmq_connecter_t ( - inout->get_io_thread (), inout->get_owner (), - options, inout->get_ordinal (), true); - zmq_assert (reconnecter); - reconnecter->set_address (protocol.c_str(), address.c_str ()); - } - - inout->detach (reconnecter); + inout->detach (); unplug (); delete this; } |