From 059beca59d39d90a8ee0e1b07f840994962ea89e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 12 Aug 2009 09:40:16 +0200 Subject: listener/connecter/init/session added --- src/zmq_connecter.cpp | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) (limited to 'src/zmq_connecter.cpp') diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 513508d..4416a70 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -18,11 +18,16 @@ */ #include "zmq_connecter.hpp" +#include "zmq_init.hpp" +#include "io_thread.hpp" #include "err.hpp" -zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_) : - io_object_t (parent_, owner_), - waiting (false) +zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_, + const options_t &options_) : + owned_t (parent_, owner_), + io_object_t (parent_), + handle_valid (false), + options (options_) { } @@ -38,12 +43,12 @@ int zmq::zmq_connecter_t::set_address (const char *addr_) void zmq::zmq_connecter_t::process_plug () { start_connecting (); - io_object_t::process_plug (); + owned_t::process_plug (); } void zmq::zmq_connecter_t::process_unplug () { - if (!waiting) + if (handle_valid) rm_fd (handle); } @@ -58,30 +63,31 @@ void zmq::zmq_connecter_t::in_event () void zmq::zmq_connecter_t::out_event () { fd_t fd = tcp_connecter.connect (); + rm_fd (handle); + handle_valid = false; // If there was error during the connecting, close the socket and wait // for a while before trying to reconnect. if (fd == retired_fd) { - rm_fd (handle); tcp_connecter.close (); - waiting = true; add_timer (); return; } - zmq_assert (false); + // Create an init object. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, true, options); + zmq_assert (init); + send_plug (init); + send_own (owner, init); -/* - object_t *engine = new zmq_engine_t (choose_io_thread (0), owner); - send_plug (engine); - send_own (owner, engine); -*/ + // Ask owner socket to shut the connecter down. + term (); } void zmq::zmq_connecter_t::timer_event () { // Reconnect period have elapsed. - waiting = false; start_connecting (); } @@ -99,12 +105,12 @@ void zmq::zmq_connecter_t::start_connecting () // Connection establishment may be dealyed. Poll for its completion. else if (rc == -1 && errno == EINPROGRESS) { handle = add_fd (tcp_connecter.get_fd ()); + handle_valid = true; set_pollout (handle); return; } // If none of the above is true, synchronous error occured. // Wait for a while and retry. - waiting = true; add_timer (); } -- cgit v1.2.3