diff options
Diffstat (limited to 'src/zmq_engine.cpp')
-rw-r--r-- | src/zmq_engine.cpp | 119 |
1 files changed, 72 insertions, 47 deletions
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 41b10c8..57a8baf 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -1,19 +1,20 @@ /* - Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ @@ -32,10 +33,7 @@ #include "config.hpp" #include "err.hpp" -zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, - const options_t &options_, bool reconnect_, - const char *protocol_, const char *address_) : - io_object_t (parent_), +zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : inpos (NULL), insize (0), decoder (in_batch_size), @@ -43,14 +41,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, outsize (0), encoder (out_batch_size), inout (NULL), + ephemeral_inout (NULL), options (options_), - reconnect (reconnect_) + plugged (false) { - if (reconnect) { - protocol = protocol_; - address = address_; - } - // Initialise the underlying socket. int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); zmq_assert (rc == 0); @@ -58,33 +52,56 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, zmq::zmq_engine_t::~zmq_engine_t () { + zmq_assert (!plugged); } -void zmq::zmq_engine_t::plug (i_inout *inout_) +void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) { - zmq_assert (!inout); + zmq_assert (!plugged); + plugged = true; + ephemeral_inout = NULL; + // Connect to session/init object. + zmq_assert (!inout); + zmq_assert (inout_); encoder.set_inout (inout_); decoder.set_inout (inout_); + inout = inout_; + // Connect to I/O threads poller object. + io_object_t::plug (io_thread_); handle = add_fd (tcp_socket.get_fd ()); set_pollin (handle); set_pollout (handle); - inout = inout_; - // Flush all the data that may have been already received downstream. in_event (); } void zmq::zmq_engine_t::unplug () { + zmq_assert (plugged); + plugged = false; + + // Cancel all fd subscriptions. rm_fd (handle); + + // Disconnect from I/O threads poller object. + io_object_t::unplug (); + + // Disconnect from init/session object. encoder.set_inout (NULL); decoder.set_inout (NULL); + ephemeral_inout = inout; inout = NULL; } +void zmq::zmq_engine_t::terminate () +{ + unplug (); + delete this; +} + void zmq::zmq_engine_t::in_event () { bool disconnection = false; @@ -106,23 +123,36 @@ void zmq::zmq_engine_t::in_event () // Push the data to the decoder. size_t processed = decoder.process_buffer (inpos, insize); - // Stop polling for input if we got stuck. - if (processed < insize) { - - // This may happen if queue limits are in effect or when - // init object reads all required information from the socket - // and rejects to read more data. - reset_pollin (handle); + if (unlikely (processed == (size_t) -1)) { + disconnection = true; } + else { + + // Stop polling for input if we got stuck. + if (processed < insize) { - // Adjust the buffer. - inpos += processed; - insize -= processed; + // This may happen if queue limits are in effect or when + // init object reads all required information from the socket + // and rejects to read more data. + if (plugged) + reset_pollin (handle); + } + + // Adjust the buffer. + inpos += processed; + insize -= processed; + } // Flush all messages the decoder may have produced. - inout->flush (); + // If IO handler has unplugged engine, flush transient IO handler. + if (unlikely (!plugged)) { + zmq_assert (ephemeral_inout); + ephemeral_inout->flush (); + } else { + inout->flush (); + } - if (disconnection) + if (inout && disconnection) error (); } @@ -133,7 +163,14 @@ void zmq::zmq_engine_t::out_event () outpos = NULL; encoder.get_data (&outpos, &outsize); - + + // If IO handler has unplugged engine, flush transient IO handler. + if (unlikely (!plugged)) { + zmq_assert (ephemeral_inout); + ephemeral_inout->flush (); + return; + } + // If there is no data to send, stop polling for output. if (outsize == 0) { reset_pollout (handle); @@ -155,7 +192,7 @@ void zmq::zmq_engine_t::out_event () outsize -= nbytes; } -void zmq::zmq_engine_t::revive () +void zmq::zmq_engine_t::activate_out () { set_pollout (handle); @@ -166,30 +203,18 @@ void zmq::zmq_engine_t::revive () out_event (); } -void zmq::zmq_engine_t::resume_input () +void zmq::zmq_engine_t::activate_in () { set_pollin (handle); + // Speculative read. in_event (); } void zmq::zmq_engine_t::error () { zmq_assert (inout); - - zmq_connecter_t *reconnecter = NULL; - if (reconnect) { - - // Create a connecter object to attempt reconnect. - // Ask it to wait for a while before reconnecting. - reconnecter = new (std::nothrow) zmq_connecter_t ( - inout->get_io_thread (), inout->get_owner (), - options, inout->get_ordinal (), true); - zmq_assert (reconnecter); - reconnecter->set_address (protocol.c_str(), address.c_str ()); - } - - inout->detach (reconnecter); + inout->detach (); unplug (); delete this; } |