diff options
Diffstat (limited to 'src/zmq_connecter.cpp')
-rw-r--r-- | src/zmq_connecter.cpp | 118 |
1 files changed, 74 insertions, 44 deletions
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index ebd7572..fb77cdc 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -1,73 +1,69 @@ /* - Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include <new> +#include "platform.hpp" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <sys/types.h> +#include <unistd.h> +#endif + #include "zmq_connecter.hpp" #include "zmq_engine.hpp" #include "zmq_init.hpp" #include "io_thread.hpp" #include "err.hpp" -zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, - socket_base_t *owner_, const options_t &options_, - uint64_t session_ordinal_, bool wait_) : - owned_t (parent_, owner_), - io_object_t (parent_), +zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_, + class session_t *session_, const options_t &options_, + const char *protocol_, const char *address_, bool wait_) : + own_t (io_thread_, options_), + io_object_t (io_thread_), handle_valid (false), wait (wait_), - session_ordinal (session_ordinal_), - options (options_) + session (session_), + current_reconnect_ivl(options.reconnect_ivl) { + int rc = tcp_connecter.set_address (protocol_, address_); + zmq_assert (rc == 0); } zmq::zmq_connecter_t::~zmq_connecter_t () { -} - -int zmq::zmq_connecter_t::set_address (const char *protocol_, - const char *address_) -{ - int rc = tcp_connecter.set_address (protocol_, address_); - if (rc != 0) - return rc; - protocol = protocol_; - address = address_; - return 0; + if (wait) + cancel_timer (reconnect_timer_id); + if (handle_valid) + rm_fd (handle); } void zmq::zmq_connecter_t::process_plug () { if (wait) - add_timer (); + add_reconnect_timer(); else start_connecting (); } -void zmq::zmq_connecter_t::process_unplug () -{ - if (wait) - cancel_timer (); - if (handle_valid) - rm_fd (handle); -} - void zmq::zmq_connecter_t::in_event () { // We are not polling for incomming data, so we are actually called @@ -86,25 +82,28 @@ void zmq::zmq_connecter_t::out_event () if (fd == retired_fd) { tcp_connecter.close (); wait = true; - add_timer (); + add_reconnect_timer(); return; } + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + // Create an init object. - zmq_init_t *init = new (std::nothrow) zmq_init_t ( - choose_io_thread (options.affinity), owner, - fd, options, true, protocol.c_str (), address.c_str (), - session_ordinal); - zmq_assert (init); - send_plug (init); - send_own (owner, init); - - // Ask owner socket to shut the connecter down. - term (); + zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL, + session, fd, options); + alloc_assert (init); + launch_sibling (init); + + // Shut the connecter down. + terminate (); } -void zmq::zmq_connecter_t::timer_event () +void zmq::zmq_connecter_t::timer_event (int id_) { + zmq_assert (id_ == reconnect_timer_id); wait = false; start_connecting (); } @@ -132,5 +131,36 @@ void zmq::zmq_connecter_t::start_connecting () // Handle any other error condition by eventual reconnect. wait = true; - add_timer (); + add_reconnect_timer(); +} + +void zmq::zmq_connecter_t::add_reconnect_timer() +{ + add_timer (get_new_reconnect_ivl(), reconnect_timer_id); +} + +int zmq::zmq_connecter_t::get_new_reconnect_ivl () +{ +#if defined ZMQ_HAVE_WINDOWS + int pid = (int) GetCurrentProcessId (); +#else + int pid = (int) getpid (); +#endif + + // The new interval is the current interval + random value. + int this_interval = current_reconnect_ivl + + ((pid * 13) % options.reconnect_ivl); + + // Only change the current reconnect interval if the maximum reconnect + // interval was set and if it's larger than the reconnect interval. + if (options.reconnect_ivl_max > 0 && + options.reconnect_ivl_max > options.reconnect_ivl) { + + // Calculate the next interval + current_reconnect_ivl = current_reconnect_ivl * 2; + if(current_reconnect_ivl >= options.reconnect_ivl_max) { + current_reconnect_ivl = options.reconnect_ivl_max; + } + } + return this_interval; } |