summaryrefslogtreecommitdiff
path: root/src/zmq_connecter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq_connecter.cpp')
-rw-r--r--src/zmq_connecter.cpp78
1 files changed, 43 insertions, 35 deletions
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index ebd7572..46ac8b5 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -19,55 +19,60 @@
#include <new>
+#include "platform.hpp"
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include <sys/types.h>
+#include <unistd.h>
+#endif
+
#include "zmq_connecter.hpp"
#include "zmq_engine.hpp"
#include "zmq_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
-zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
- socket_base_t *owner_, const options_t &options_,
- uint64_t session_ordinal_, bool wait_) :
- owned_t (parent_, owner_),
- io_object_t (parent_),
+zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
+ class session_t *session_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ own_t (io_thread_),
+ io_object_t (io_thread_),
handle_valid (false),
- wait (wait_),
- session_ordinal (session_ordinal_),
+ wait (wait_before_connect),
+ session (session_),
options (options_)
{
+ int rc = tcp_connecter.set_address (protocol_, address_);
+ zmq_assert (rc == 0);
}
zmq::zmq_connecter_t::~zmq_connecter_t ()
{
+ if (wait)
+ cancel_timer (reconnect_timer_id);
+ if (handle_valid)
+ rm_fd (handle);
}
-int zmq::zmq_connecter_t::set_address (const char *protocol_,
- const char *address_)
+int zmq::zmq_connecter_t::get_reconnect_period ()
{
- int rc = tcp_connecter.set_address (protocol_, address_);
- if (rc != 0)
- return rc;
- protocol = protocol_;
- address = address_;
- return 0;
+#if defined ZMQ_HAVE_WINDOWS
+ return (reconnect_period + (((int)GetCurrentProcessId () * 13)
+ % reconnect_period));
+#else
+ return (reconnect_period + (((int)getpid () * 13) % reconnect_period));
+#endif
}
void zmq::zmq_connecter_t::process_plug ()
{
if (wait)
- add_timer ();
+ add_timer (get_reconnect_period (), reconnect_timer_id);
else
start_connecting ();
}
-void zmq::zmq_connecter_t::process_unplug ()
-{
- if (wait)
- cancel_timer ();
- if (handle_valid)
- rm_fd (handle);
-}
-
void zmq::zmq_connecter_t::in_event ()
{
// We are not polling for incomming data, so we are actually called
@@ -86,25 +91,28 @@ void zmq::zmq_connecter_t::out_event ()
if (fd == retired_fd) {
tcp_connecter.close ();
wait = true;
- add_timer ();
+ add_timer (get_reconnect_period (), reconnect_timer_id);
return;
}
+ // Choose I/O thread to run connecter in. Given that we are already
+ // running in an I/O thread, there must be at least one available.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_assert (io_thread);
+
// Create an init object.
- zmq_init_t *init = new (std::nothrow) zmq_init_t (
- choose_io_thread (options.affinity), owner,
- fd, options, true, protocol.c_str (), address.c_str (),
- session_ordinal);
+ zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL,
+ session, fd, options);
zmq_assert (init);
- send_plug (init);
- send_own (owner, init);
+ launch_sibling (init);
- // Ask owner socket to shut the connecter down.
- term ();
+ // Shut the connecter down.
+ terminate ();
}
-void zmq::zmq_connecter_t::timer_event ()
+void zmq::zmq_connecter_t::timer_event (int id_)
{
+ zmq_assert (id_ == reconnect_timer_id);
wait = false;
start_connecting ();
}
@@ -132,5 +140,5 @@ void zmq::zmq_connecter_t::start_connecting ()
// Handle any other error condition by eventual reconnect.
wait = true;
- add_timer ();
+ add_timer (get_reconnect_period (), reconnect_timer_id);
}