diff options
Diffstat (limited to 'src/zmq_connecter.cpp')
-rw-r--r-- | src/zmq_connecter.cpp | 78 |
1 files changed, 43 insertions, 35 deletions
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index ebd7572..46ac8b5 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -19,55 +19,60 @@ #include <new> +#include "platform.hpp" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <sys/types.h> +#include <unistd.h> +#endif + #include "zmq_connecter.hpp" #include "zmq_engine.hpp" #include "zmq_init.hpp" #include "io_thread.hpp" #include "err.hpp" -zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, - socket_base_t *owner_, const options_t &options_, - uint64_t session_ordinal_, bool wait_) : - owned_t (parent_, owner_), - io_object_t (parent_), +zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_, + class session_t *session_, const options_t &options_, + const char *protocol_, const char *address_) : + own_t (io_thread_), + io_object_t (io_thread_), handle_valid (false), - wait (wait_), - session_ordinal (session_ordinal_), + wait (wait_before_connect), + session (session_), options (options_) { + int rc = tcp_connecter.set_address (protocol_, address_); + zmq_assert (rc == 0); } zmq::zmq_connecter_t::~zmq_connecter_t () { + if (wait) + cancel_timer (reconnect_timer_id); + if (handle_valid) + rm_fd (handle); } -int zmq::zmq_connecter_t::set_address (const char *protocol_, - const char *address_) +int zmq::zmq_connecter_t::get_reconnect_period () { - int rc = tcp_connecter.set_address (protocol_, address_); - if (rc != 0) - return rc; - protocol = protocol_; - address = address_; - return 0; +#if defined ZMQ_HAVE_WINDOWS + return (reconnect_period + (((int)GetCurrentProcessId () * 13) + % reconnect_period)); +#else + return (reconnect_period + (((int)getpid () * 13) % reconnect_period)); +#endif } void zmq::zmq_connecter_t::process_plug () { if (wait) - add_timer (); + add_timer (get_reconnect_period (), reconnect_timer_id); else start_connecting (); } -void zmq::zmq_connecter_t::process_unplug () -{ - if (wait) - cancel_timer (); - if (handle_valid) - rm_fd (handle); -} - void zmq::zmq_connecter_t::in_event () { // We are not polling for incomming data, so we are actually called @@ -86,25 +91,28 @@ void zmq::zmq_connecter_t::out_event () if (fd == retired_fd) { tcp_connecter.close (); wait = true; - add_timer (); + add_timer (get_reconnect_period (), reconnect_timer_id); return; } + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + // Create an init object. - zmq_init_t *init = new (std::nothrow) zmq_init_t ( - choose_io_thread (options.affinity), owner, - fd, options, true, protocol.c_str (), address.c_str (), - session_ordinal); + zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL, + session, fd, options); zmq_assert (init); - send_plug (init); - send_own (owner, init); + launch_sibling (init); - // Ask owner socket to shut the connecter down. - term (); + // Shut the connecter down. + terminate (); } -void zmq::zmq_connecter_t::timer_event () +void zmq::zmq_connecter_t::timer_event (int id_) { + zmq_assert (id_ == reconnect_timer_id); wait = false; start_connecting (); } @@ -132,5 +140,5 @@ void zmq::zmq_connecter_t::start_connecting () // Handle any other error condition by eventual reconnect. wait = true; - add_timer (); + add_timer (get_reconnect_period (), reconnect_timer_id); } |