summaryrefslogtreecommitdiff
path: root/src/zmq_connecter.cpp
diff options
context:
space:
mode:
authorMartin Lucina <mato@kotelna.sk>2011-03-28 10:39:51 +0200
committerMartin Lucina <martin@lucina.net>2012-01-23 08:53:37 +0100
commit3e20cb1b8a2b1ca222011df37334e5f4f88dd565 (patch)
tree4a753775186bc7f583f1ceb3f9aa675b6f110596 /src/zmq_connecter.cpp
parent3f0085ddbef1a44b6bb7a0b23af497d56e0025fa (diff)
parente645fc2693acc796304498909786b7b47005b429 (diff)
Imported Debian patch 2.1.3-1debian/2.1.3-1
Diffstat (limited to 'src/zmq_connecter.cpp')
-rw-r--r--src/zmq_connecter.cpp118
1 files changed, 74 insertions, 44 deletions
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index ebd7572..fb77cdc 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -1,73 +1,69 @@
/*
- Copyright (c) 2007-2010 iMatix Corporation
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
+ the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
+ GNU Lesser General Public License for more details.
- You should have received a copy of the Lesser GNU General Public License
+ You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <new>
+#include "platform.hpp"
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include <sys/types.h>
+#include <unistd.h>
+#endif
+
#include "zmq_connecter.hpp"
#include "zmq_engine.hpp"
#include "zmq_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
-zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
- socket_base_t *owner_, const options_t &options_,
- uint64_t session_ordinal_, bool wait_) :
- owned_t (parent_, owner_),
- io_object_t (parent_),
+zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
+ class session_t *session_, const options_t &options_,
+ const char *protocol_, const char *address_, bool wait_) :
+ own_t (io_thread_, options_),
+ io_object_t (io_thread_),
handle_valid (false),
wait (wait_),
- session_ordinal (session_ordinal_),
- options (options_)
+ session (session_),
+ current_reconnect_ivl(options.reconnect_ivl)
{
+ int rc = tcp_connecter.set_address (protocol_, address_);
+ zmq_assert (rc == 0);
}
zmq::zmq_connecter_t::~zmq_connecter_t ()
{
-}
-
-int zmq::zmq_connecter_t::set_address (const char *protocol_,
- const char *address_)
-{
- int rc = tcp_connecter.set_address (protocol_, address_);
- if (rc != 0)
- return rc;
- protocol = protocol_;
- address = address_;
- return 0;
+ if (wait)
+ cancel_timer (reconnect_timer_id);
+ if (handle_valid)
+ rm_fd (handle);
}
void zmq::zmq_connecter_t::process_plug ()
{
if (wait)
- add_timer ();
+ add_reconnect_timer();
else
start_connecting ();
}
-void zmq::zmq_connecter_t::process_unplug ()
-{
- if (wait)
- cancel_timer ();
- if (handle_valid)
- rm_fd (handle);
-}
-
void zmq::zmq_connecter_t::in_event ()
{
// We are not polling for incomming data, so we are actually called
@@ -86,25 +82,28 @@ void zmq::zmq_connecter_t::out_event ()
if (fd == retired_fd) {
tcp_connecter.close ();
wait = true;
- add_timer ();
+ add_reconnect_timer();
return;
}
+ // Choose I/O thread to run connecter in. Given that we are already
+ // running in an I/O thread, there must be at least one available.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_assert (io_thread);
+
// Create an init object.
- zmq_init_t *init = new (std::nothrow) zmq_init_t (
- choose_io_thread (options.affinity), owner,
- fd, options, true, protocol.c_str (), address.c_str (),
- session_ordinal);
- zmq_assert (init);
- send_plug (init);
- send_own (owner, init);
-
- // Ask owner socket to shut the connecter down.
- term ();
+ zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL,
+ session, fd, options);
+ alloc_assert (init);
+ launch_sibling (init);
+
+ // Shut the connecter down.
+ terminate ();
}
-void zmq::zmq_connecter_t::timer_event ()
+void zmq::zmq_connecter_t::timer_event (int id_)
{
+ zmq_assert (id_ == reconnect_timer_id);
wait = false;
start_connecting ();
}
@@ -132,5 +131,36 @@ void zmq::zmq_connecter_t::start_connecting ()
// Handle any other error condition by eventual reconnect.
wait = true;
- add_timer ();
+ add_reconnect_timer();
+}
+
+void zmq::zmq_connecter_t::add_reconnect_timer()
+{
+ add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
+}
+
+int zmq::zmq_connecter_t::get_new_reconnect_ivl ()
+{
+#if defined ZMQ_HAVE_WINDOWS
+ int pid = (int) GetCurrentProcessId ();
+#else
+ int pid = (int) getpid ();
+#endif
+
+ // The new interval is the current interval + random value.
+ int this_interval = current_reconnect_ivl +
+ ((pid * 13) % options.reconnect_ivl);
+
+ // Only change the current reconnect interval if the maximum reconnect
+ // interval was set and if it's larger than the reconnect interval.
+ if (options.reconnect_ivl_max > 0 &&
+ options.reconnect_ivl_max > options.reconnect_ivl) {
+
+ // Calculate the next interval
+ current_reconnect_ivl = current_reconnect_ivl * 2;
+ if(current_reconnect_ivl >= options.reconnect_ivl_max) {
+ current_reconnect_ivl = options.reconnect_ivl_max;
+ }
+ }
+ return this_interval;
}