From 9c522dccaf0b2c8074bd96fbfb4c968f45748ba4 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 16 Sep 2009 11:02:18 +0200 Subject: reconnect added to zmq_connecter --- perf/c/remote_lat.c | 10 ++++++---- perf/cpp/remote_lat.cpp | 5 ++++- src/session.cpp | 9 ++++++--- src/session.hpp | 5 ++++- src/socket_base.cpp | 4 ++-- src/zmq_connecter.cpp | 22 +++++++++++++++++----- src/zmq_connecter.hpp | 5 ++++- src/zmq_connecter_init.cpp | 22 ++++++++++++++++++++-- src/zmq_engine.cpp | 1 + src/zmq_engine.hpp | 1 + src/zmq_listener_init.cpp | 2 +- 11 files changed, 66 insertions(+), 20 deletions(-) diff --git a/perf/c/remote_lat.c b/perf/c/remote_lat.c index 23695b4..15dfc46 100644 --- a/perf/c/remote_lat.c +++ b/perf/c/remote_lat.c @@ -20,6 +20,7 @@ #include #include #include +#include #include int main (int argc, char *argv []) @@ -54,10 +55,11 @@ int main (int argc, char *argv []) rc = zmq_connect (s, connect_to); assert (rc == 0); - watch = zmq_stopwatch_start (); - rc = zmq_msg_init_size (&msg, message_size); assert (rc == 0); + memset (zmq_msg_data (&msg), 0, message_size); + + watch = zmq_stopwatch_start (); for (i = 0; i != roundtrip_count; i++) { rc = zmq_send (s, &msg, 0); @@ -67,11 +69,11 @@ int main (int argc, char *argv []) assert (zmq_msg_size (&msg) == message_size); } + elapsed = zmq_stopwatch_stop (watch); + rc = zmq_msg_close (&msg); assert (rc == 0); - elapsed = zmq_stopwatch_stop (watch); - latency = (double) elapsed / (roundtrip_count * 2); printf ("message size: %d [B]\n", (int) message_size); diff --git a/perf/cpp/remote_lat.cpp b/perf/cpp/remote_lat.cpp index f1d2a17..f6ccbb4 100644 --- a/perf/cpp/remote_lat.cpp +++ b/perf/cpp/remote_lat.cpp @@ -22,6 +22,7 @@ #include #include #include +#include int main (int argc, char *argv []) { @@ -39,10 +40,12 @@ int main (int argc, char *argv []) zmq::socket_t s (ctx, ZMQ_REQ); s.connect (connect_to); + zmq::message_t msg (message_size); + memset (msg.data (), 0, message_size); + void *watch = zmq_stopwatch_start (); for (int i = 0; i != roundtrip_count; i++) { - zmq::message_t msg (message_size); s.send (msg); s.recv (&msg); assert (msg.size () == message_size); diff --git a/src/session.cpp b/src/session.cpp index 31c6354..9593827 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -23,14 +23,15 @@ #include "pipe.hpp" zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const char *name_, const options_t &options_) : + const char *name_, const options_t &options_, bool reconnect_) : owned_t (parent_, owner_), in_pipe (NULL), active (true), out_pipe (NULL), engine (NULL), name (name_), - options (options_) + options (options_), + reconnect (reconnect_) { } @@ -69,7 +70,9 @@ void zmq::session_t::flush () void zmq::session_t::detach () { - // Engine is terminating itself. + // TODO: Start reconnection process here. + + // Engine is terminating itself. No need to deallocate it from here. engine = NULL; // In the case od anonymous connection, terminate the session. diff --git a/src/session.hpp b/src/session.hpp index 195bdca..55aa8ea 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -35,7 +35,7 @@ namespace zmq public: session_t (object_t *parent_, socket_base_t *owner_, const char *name_, - const options_t &options_); + const options_t &options_, bool reconnect_); // i_inout interface implementation. bool read (::zmq_msg_t *msg_); @@ -77,6 +77,9 @@ namespace zmq // Inherited socket options. options_t options; + // If true, reconnection is required after connection breaks. + bool reconnect; + session_t (const session_t&); void operator = (const session_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c195e91..10f1404 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -236,7 +236,7 @@ int zmq::socket_base_t::connect (const char *addr_) // Create the session. io_thread_t *io_thread = choose_io_thread (options.affinity); session_t *session = new session_t (io_thread, this, session_name.c_str (), - options); + options, true); zmq_assert (session); // Create inbound pipe. @@ -267,7 +267,7 @@ int zmq::socket_base_t::connect (const char *addr_) // it can bind the new connection to the session once it is established. zmq_connecter_t *connecter = new zmq_connecter_t ( choose_io_thread (options.affinity), this, options, - session_name.c_str ()); + session_name.c_str (), false); int rc = connecter->set_address (addr_args.c_str ()); if (rc != 0) { delete connecter; diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index be88bff..fb147c0 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -24,10 +24,11 @@ zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, socket_base_t *owner_, const options_t &options_, - const char *session_name_) : + const char *session_name_, bool wait_) : owned_t (parent_, owner_), io_object_t (parent_), handle_valid (false), + wait (wait_), options (options_), session_name (session_name_) { @@ -44,12 +45,17 @@ int zmq::zmq_connecter_t::set_address (const char *addr_) void zmq::zmq_connecter_t::process_plug () { - start_connecting (); + if (wait) + add_timer (); + else + start_connecting (); owned_t::process_plug (); } void zmq::zmq_connecter_t::process_unplug () { + if (wait) + cancel_timer (); if (handle_valid) rm_fd (handle); } @@ -68,8 +74,13 @@ void zmq::zmq_connecter_t::out_event () rm_fd (handle); handle_valid = false; - // TODO: Handle the error condition by eventual reconnect. - zmq_assert (fd != retired_fd); + // Handle the error condition by attempt to reconnect. + if (fd == retired_fd) { + tcp_connecter.close (); + wait = true; + add_timer (); + return; + } // Create an init object. io_thread_t *io_thread = choose_io_thread (options.affinity); @@ -85,7 +96,8 @@ void zmq::zmq_connecter_t::out_event () void zmq::zmq_connecter_t::timer_event () { - zmq_assert (false); + wait = false; + start_connecting (); } void zmq::zmq_connecter_t::start_connecting () diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index e308502..c443ce8 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -36,7 +36,7 @@ namespace zmq public: zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_, - const options_t &options_, const char *session_name_); + const options_t &options_, const char *session_name_, bool wait_); ~zmq_connecter_t (); // Set IP address to connect to. @@ -66,6 +66,9 @@ namespace zmq // contains valid value. bool handle_valid; + // If true, connecter is waiting a while before trying to connect. + bool wait; + // Associated socket options. options_t options; diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp index 730077a..ffd4a64 100644 --- a/src/zmq_connecter_init.cpp +++ b/src/zmq_connecter_init.cpp @@ -18,6 +18,7 @@ */ #include "zmq_connecter_init.hpp" +#include "zmq_connecter.hpp" #include "io_thread.hpp" #include "session.hpp" #include "err.hpp" @@ -83,8 +84,25 @@ void zmq::zmq_connecter_init_t::flush () void zmq::zmq_connecter_init_t::detach () { - // TODO: Engine is closing down. Init object is to be closed as well. - zmq_assert (false); + // TODO: Start reconnection process here. +/* + // Create a connecter object to attempt reconnect. Ask it to wait for a + // while before reconnecting. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_connecter_t *connecter = new zmq_connecter_t (io_thread, owner, + options, session_name.c_str (), true); + connecter->set_address (...); + zmq_assert (connecter); + send_plug (connecter); + send_own (owner, connecter); +*/ + + // This function is called by engine when disconnection occurs. + // The engine will destroy itself, so we just drop the pointer here and + // start termination of the init object. + engine = NULL; + term (); + } void zmq::zmq_connecter_init_t::process_plug () diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 2c5eb1c..b82af0a 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -18,6 +18,7 @@ */ #include "zmq_engine.hpp" +#include "zmq_connecter.hpp" #include "io_thread.hpp" #include "i_inout.hpp" #include "config.hpp" diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 8299ebf..0d1b10a 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -25,6 +25,7 @@ #include "tcp_socket.hpp" #include "zmq_encoder.hpp" #include "zmq_decoder.hpp" +#include "options.hpp" namespace zmq { diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 756e9d8..eec41c7 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -76,7 +76,7 @@ void zmq::zmq_listener_init_t::flush () if (!session) { io_thread_t *io_thread = choose_io_thread (options.affinity); session = new session_t (io_thread, owner, peer_identity.c_str (), - options); + options, false); zmq_assert (session); send_plug (session); send_own (owner, session); -- cgit v1.2.3