From 2cef05d86976784f4bc1083cb0fa548e267ac132 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 15 Dec 2009 17:49:40 +0100 Subject: reconnection mechanism added to tcp transport --- src/i_inout.hpp | 15 +++++++++++++-- src/io_object.hpp | 1 + src/io_thread.hpp | 1 + src/object.hpp | 5 +++-- src/session.cpp | 23 +++++++++++++++++++++-- src/session.hpp | 5 ++++- src/zmq_connecter.cpp | 10 +++++++--- src/zmq_connecter.hpp | 5 ++++- src/zmq_connecter_init.cpp | 36 +++++++++++++++++++++--------------- src/zmq_connecter_init.hpp | 11 +++++++---- src/zmq_engine.cpp | 23 ++++++++++++++++++++--- src/zmq_engine.hpp | 5 ++++- src/zmq_listener_init.cpp | 22 ++++++++++++++++++++-- src/zmq_listener_init.hpp | 5 ++++- 14 files changed, 130 insertions(+), 37 deletions(-) diff --git a/src/i_inout.hpp b/src/i_inout.hpp index 6be6169..b82a476 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -36,8 +36,19 @@ namespace zmq // Flush all the previously written messages downstream. virtual void flush () = 0; - // Drop all the references to the engine. - virtual void detach () = 0; + // Drop all the references to the engine. The parameter is the object + // to use to reconnect. If reconnection is not required, the argument + // is set to NULL. + virtual void detach (class owned_t *reconnecter_) = 0; + + // Returns least loaded I/O thread. + virtual class io_thread_t *get_io_thread () = 0; + + // Return pointer to the owning socket. + virtual class socket_base_t *get_owner () = 0; + + // Returns the name of associated session. + virtual const char *get_session_name () = 0; }; } diff --git a/src/io_object.hpp b/src/io_object.hpp index 2ed5e24..7b66687 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -22,6 +22,7 @@ #include +#include "stdint.hpp" #include "poller.hpp" #include "i_poll_events.hpp" diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 457cdbf..c0dd77b 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -22,6 +22,7 @@ #include +#include "stdint.hpp" #include "object.hpp" #include "poller.hpp" #include "i_poll_events.hpp" diff --git a/src/object.hpp b/src/object.hpp index 84a57cd..496fd49 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -55,9 +55,10 @@ namespace zmq void unregister_endpoints (class socket_base_t *socket_); class socket_base_t *find_endpoint (const char *addr_); - // Derived object can use following functions to interact with - // global repositories. See dispatcher.hpp for function details. + // Returns number of thead slots in the dispatcher. int thread_slot_count (); + + // Chooses least loaded I/O thread. class io_thread_t *choose_io_thread (uint64_t taskset_); // Derived object can use these functions to send commands diff --git a/src/session.cpp b/src/session.cpp index 912cfa8..cbcc883 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -65,9 +65,13 @@ void zmq::session_t::flush () out_pipe->flush (); } -void zmq::session_t::detach () +void zmq::session_t::detach (owned_t *reconnecter_) { - // TODO: Start reconnection process here. + // Plug in the reconnecter object if any. + if (reconnecter_) { + send_plug (reconnecter_); + send_own (owner, reconnecter_); + } // Engine is terminating itself. No need to deallocate it from here. engine = NULL; @@ -77,6 +81,21 @@ void zmq::session_t::detach () term (); } +zmq::io_thread_t *zmq::session_t::get_io_thread () +{ + return choose_io_thread (options.affinity); +} + +class zmq::socket_base_t *zmq::session_t::get_owner () +{ + return owner; +} + +const char *zmq::session_t::get_session_name () +{ + return name.c_str (); +} + void zmq::session_t::attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { diff --git a/src/session.hpp b/src/session.hpp index 64900f2..72e1d59 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -41,7 +41,10 @@ namespace zmq bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); - void detach (); + void detach (owned_t *reconnecter_); + class io_thread_t *get_io_thread (); + class socket_base_t *get_owner (); + const char *get_session_name (); // i_endpoint interface implementation. void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 8d5c59f..cd7d1b5 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -38,9 +38,13 @@ zmq::zmq_connecter_t::~zmq_connecter_t () { } -int zmq::zmq_connecter_t::set_address (const char *addr_) +int zmq::zmq_connecter_t::set_address (const char *address_) { - return tcp_connecter.set_address (addr_); + int rc = tcp_connecter.set_address (address_); + if (rc != 0) + return rc; + address = address_; + return 0; } void zmq::zmq_connecter_t::process_plug () @@ -84,7 +88,7 @@ void zmq::zmq_connecter_t::out_event () // Create an init object. io_thread_t *io_thread = choose_io_thread (options.affinity); zmq_connecter_init_t *init = new zmq_connecter_init_t (io_thread, owner, - fd, options, session_name.c_str ()); + fd, options, session_name.c_str (), address.c_str ()); zmq_assert (init); send_plug (init); send_own (owner, init); diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index c443ce8..acd3352 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -40,7 +40,7 @@ namespace zmq ~zmq_connecter_t (); // Set IP address to connect to. - int set_address (const char *addr_); + int set_address (const char *address_); private: @@ -75,6 +75,9 @@ namespace zmq // Name of the session associated with the connecter. std::string session_name; + // Address to connect to. + std::string address; + zmq_connecter_t (const zmq_connecter_t&); void operator = (const zmq_connecter_t&); }; diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp index ea6a8c0..1418429 100644 --- a/src/zmq_connecter_init.cpp +++ b/src/zmq_connecter_init.cpp @@ -25,13 +25,13 @@ zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_, socket_base_t *owner_, fd_t fd_, const options_t &options_, - const char *session_name_) : + const char *session_name_, const char *address_) : owned_t (parent_, owner_), options (options_), session_name (session_name_) { // Create associated engine object. - engine = new zmq_engine_t (parent_, fd_, options); + engine = new zmq_engine_t (parent_, fd_, options, true, address_); zmq_assert (engine); } @@ -87,27 +87,33 @@ void zmq::zmq_connecter_init_t::flush () // We are not expecting any messages. No point in flushing. } -void zmq::zmq_connecter_init_t::detach () +void zmq::zmq_connecter_init_t::detach (owned_t *reconnecter_) { - // TODO: Start reconnection process here. -/* - // Create a connecter object to attempt reconnect. Ask it to wait for a - // while before reconnecting. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_connecter_t *connecter = new zmq_connecter_t (io_thread, owner, - options, session_name.c_str (), true); - connecter->set_address (...); - zmq_assert (connecter); - send_plug (connecter); - send_own (owner, connecter); -*/ + // Plug in the reconnecter object. + zmq_assert (reconnecter_); + send_plug (reconnecter_); + send_own (owner, reconnecter_); // This function is called by engine when disconnection occurs. // The engine will destroy itself, so we just drop the pointer here and // start termination of the init object. engine = NULL; term (); +} + +zmq::io_thread_t *zmq::zmq_connecter_init_t::get_io_thread () +{ + return choose_io_thread (options.affinity); +} +class zmq::socket_base_t *zmq::zmq_connecter_init_t::get_owner () +{ + return owner; +} + +const char *zmq::zmq_connecter_init_t::get_session_name () +{ + return session_name.c_str (); } void zmq::zmq_connecter_init_t::process_plug () diff --git a/src/zmq_connecter_init.hpp b/src/zmq_connecter_init.hpp index 3f42fc6..03ccd24 100644 --- a/src/zmq_connecter_init.hpp +++ b/src/zmq_connecter_init.hpp @@ -40,7 +40,8 @@ namespace zmq public: zmq_connecter_init_t (class io_thread_t *parent_, socket_base_t *owner_, - fd_t fd_, const options_t &options, const char *session_name_); + fd_t fd_, const options_t &options, const char *session_name_, + const char *address_); ~zmq_connecter_init_t (); private: @@ -49,7 +50,10 @@ namespace zmq bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); - void detach (); + void detach (owned_t *reconnecter_); + class io_thread_t *get_io_thread (); + class socket_base_t *get_owner (); + const char *get_session_name (); // Handlers for incoming commands. void process_plug (); @@ -63,8 +67,7 @@ namespace zmq // Associated socket options. options_t options; - // Name of the session to bind new connection to. Makes sense only - // when 'connected' is true. + // Name of the session to bind new connection to. std::string session_name; zmq_connecter_init_t (const zmq_connecter_init_t&); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 18fc616..6b439f5 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -25,7 +25,7 @@ #include "err.hpp" zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, - const options_t &options_) : + const options_t &options_, bool reconnect_, const char *address_) : io_object_t (parent_), inpos (NULL), insize (0), @@ -34,8 +34,12 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, outsize (0), encoder (out_batch_size, false), inout (NULL), - options (options_) + options (options_), + reconnect (reconnect_) { + if (reconnect) + address = address_; + // Initialise the underlying socket. int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); zmq_assert (rc == 0); @@ -145,7 +149,20 @@ void zmq::zmq_engine_t::revive () void zmq::zmq_engine_t::error () { zmq_assert (inout); - inout->detach (); + + zmq_connecter_t *reconnecter = NULL; + if (reconnect) { + + // Create a connecter object to attempt reconnect. + // Ask it to wait for a while before reconnecting. + reconnecter = new zmq_connecter_t ( + inout->get_io_thread (), inout->get_owner (), + options, inout->get_session_name (), true); + zmq_assert (reconnecter); + reconnecter->set_address (address.c_str ()); + } + + inout->detach (reconnecter); unplug (); delete this; } diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 8d9e4b9..63e1539 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -37,7 +37,7 @@ namespace zmq public: zmq_engine_t (class io_thread_t *parent_, fd_t fd_, - const options_t &options_); + const options_t &options_, bool reconnect_, const char *address_); ~zmq_engine_t (); // i_engine interface implementation. @@ -69,6 +69,9 @@ namespace zmq options_t options; + bool reconnect; + std::string address; + zmq_engine_t (const zmq_engine_t&); void operator = (const zmq_engine_t&); }; diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 0c9f0ee..a463dc8 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -29,7 +29,7 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_, has_peer_identity (false) { // Create associated engine object. - engine = new zmq_engine_t (parent_, fd_, options); + engine = new zmq_engine_t (parent_, fd_, options, false, NULL); zmq_assert (engine); } @@ -93,8 +93,11 @@ void zmq::zmq_listener_init_t::flush () term (); } -void zmq::zmq_listener_init_t::detach () +void zmq::zmq_listener_init_t::detach (owned_t *reconnecter_) { + // On the listening side of the connection we are never reconnecting. + zmq_assert (reconnecter_ == NULL); + // This function is called by engine when disconnection occurs. // The engine will destroy itself, so we just drop the pointer here and // start termination of the init object. @@ -102,6 +105,21 @@ void zmq::zmq_listener_init_t::detach () term (); } +zmq::io_thread_t *zmq::zmq_listener_init_t::get_io_thread () +{ + return choose_io_thread (options.affinity); +} + +class zmq::socket_base_t *zmq::zmq_listener_init_t::get_owner () +{ + return owner; +} + +const char *zmq::zmq_listener_init_t::get_session_name () +{ + zmq_assert (false); +} + void zmq::zmq_listener_init_t::process_plug () { zmq_assert (engine); diff --git a/src/zmq_listener_init.hpp b/src/zmq_listener_init.hpp index 885b36b..d7fde02 100644 --- a/src/zmq_listener_init.hpp +++ b/src/zmq_listener_init.hpp @@ -49,7 +49,10 @@ namespace zmq bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); - void detach (); + void detach (owned_t *reconnecter_); + class io_thread_t *get_io_thread (); + class socket_base_t *get_owner (); + const char *get_session_name (); // Handlers for incoming commands. void process_plug (); -- cgit v1.2.3