From aebff623f36efddc0de7a3192832b61802f8cec8 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 23 Dec 2009 19:37:56 +0100 Subject: ZMQII-28: Bidirectional introduction on TCP connection establishment --- src/Makefile.am | 8 +- src/i_inout.hpp | 6 +- src/pgm_receiver.cpp | 3 +- src/pgm_receiver.hpp | 6 +- src/pgm_sender.cpp | 3 +- src/pgm_sender.hpp | 6 +- src/session.cpp | 54 ++++++++++--- src/session.hpp | 24 ++++-- src/socket_base.cpp | 83 +++++++++++++------ src/socket_base.hpp | 27 ++++--- src/zmq_connecter.cpp | 15 ++-- src/zmq_connecter.hpp | 8 +- src/zmq_connecter_init.cpp | 132 ------------------------------ src/zmq_connecter_init.hpp | 79 ------------------ src/zmq_engine.cpp | 2 +- src/zmq_init.cpp | 195 +++++++++++++++++++++++++++++++++++++++++++++ src/zmq_init.hpp | 89 +++++++++++++++++++++ src/zmq_listener.cpp | 6 +- src/zmq_listener_init.cpp | 137 ------------------------------- src/zmq_listener_init.hpp | 79 ------------------ 20 files changed, 445 insertions(+), 517 deletions(-) delete mode 100644 src/zmq_connecter_init.cpp delete mode 100644 src/zmq_connecter_init.hpp create mode 100644 src/zmq_init.cpp create mode 100644 src/zmq_init.hpp delete mode 100644 src/zmq_listener_init.cpp delete mode 100644 src/zmq_listener_init.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 0fdaf37..0f47e7b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -115,12 +115,11 @@ libzmq_la_SOURCES = app_thread.hpp \ ypollset.hpp \ yqueue.hpp \ zmq_connecter.hpp \ - zmq_connecter_init.hpp \ zmq_decoder.hpp \ zmq_encoder.hpp \ zmq_engine.hpp \ + zmq_init.hpp \ zmq_listener.hpp \ - zmq_listener_init.hpp \ app_thread.cpp \ devpoll.cpp \ dispatcher.cpp \ @@ -161,12 +160,11 @@ libzmq_la_SOURCES = app_thread.hpp \ ypollset.cpp \ zmq.cpp \ zmq_connecter.cpp \ - zmq_connecter_init.cpp \ zmq_decoder.cpp \ zmq_encoder.cpp \ zmq_engine.cpp \ - zmq_listener.cpp \ - zmq_listener_init.cpp + zmq_init.cpp \ + zmq_listener.cpp libzmq_la_LDFLAGS = -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAFS@ diff --git a/src/i_inout.hpp b/src/i_inout.hpp index b82a476..8a0ce6a 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -22,6 +22,8 @@ #include "../bindings/c/zmq.h" +#include "stdint.hpp" + namespace zmq { @@ -47,8 +49,8 @@ namespace zmq // Return pointer to the owning socket. virtual class socket_base_t *get_owner () = 0; - // Returns the name of associated session. - virtual const char *get_session_name () = 0; + // Return ordinal number of the session. + virtual uint64_t get_ordinal () = 0; }; } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 2a24858..1d4d695 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -37,11 +37,10 @@ #include "i_inout.hpp" zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, - const options_t &options_, const char *session_name_) : + const options_t &options_) : io_object_t (parent_), pgm_socket (true, options_), options (options_), - session_name (session_name_), inout (NULL) { } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index fa84acb..91169f4 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -47,8 +47,7 @@ namespace zmq // Creates gm_engine. Underlying PGM connection is initialised // using network_ parameter. - pgm_receiver_t (class io_thread_t *parent_, const options_t &options_, - const char *session_name_); + pgm_receiver_t (class io_thread_t *parent_, const options_t &options_); ~pgm_receiver_t (); int init (bool udp_encapsulation_, const char *network_); @@ -94,9 +93,6 @@ namespace zmq // Socket options. options_t options; - // Name of the session associated with the connecter. - std::string session_name; - // Parent session. i_inout *inout; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 676ed93..880bb09 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -33,12 +33,11 @@ #include "wire.hpp" zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, - const options_t &options_, const char *session_name_) : + const options_t &options_) : io_object_t (parent_), encoder (0, false), pgm_socket (false, options_), options (options_), - session_name (session_name_), inout (NULL), out_buffer (NULL), out_buffer_size (0), diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 9a9844c..30b545d 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -42,8 +42,7 @@ namespace zmq { public: - pgm_sender_t (class io_thread_t *parent_, const options_t &options_, - const char *session_name_); + pgm_sender_t (class io_thread_t *parent_, const options_t &options_); ~pgm_sender_t (); int init (bool udp_encapsulation_, const char *network_); @@ -74,9 +73,6 @@ namespace zmq // Socket options. options_t options; - // Name of the session associated with the connecter. - std::string session_name; - // Poll handle associated with PGM socket. handle_t handle; handle_t uplink_handle; diff --git a/src/session.cpp b/src/session.cpp index 37f2720..a17e205 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -25,16 +25,41 @@ #include "pipe.hpp" zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const char *name_, const options_t &options_, bool reconnect_) : + const options_t &options_) : owned_t (parent_, owner_), in_pipe (NULL), active (true), out_pipe (NULL), engine (NULL), - name (name_), - options (options_), - reconnect (reconnect_) + options (options_) { + type = unnamed; + + // It's possible to register the session at this point as it will be + // searched for only on reconnect, i.e. no race condition (session found + // before it is plugged into it's I/O thread) is possible. + ordinal = owner->register_session (this); +} + +zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, + const options_t &options_, const char *name_) : + owned_t (parent_, owner_), + in_pipe (NULL), + active (true), + out_pipe (NULL), + engine (NULL), + options (options_) +{ + if (name_) { + type = named; + name = name_; + ordinal = 0; + } + else { + type = transient; + // TODO: Generate unique name here. + ordinal = 0; + } } zmq::session_t::~session_t () @@ -78,8 +103,8 @@ void zmq::session_t::detach (owned_t *reconnecter_) // Engine is terminating itself. No need to deallocate it from here. engine = NULL; - // In the case od anonymous connection, terminate the session. - if (name.empty ()) + // Terminate transient session. + if (type == transient) term (); } @@ -93,9 +118,11 @@ class zmq::socket_base_t *zmq::session_t::get_owner () return owner; } -const char *zmq::session_t::get_session_name () +uint64_t zmq::session_t::get_ordinal () { - return name.c_str (); + zmq_assert (type == unnamed); + zmq_assert (ordinal); + return ordinal; } void zmq::session_t::attach_pipes (class reader_t *inpipe_, @@ -181,11 +208,12 @@ void zmq::session_t::process_plug () void zmq::session_t::process_unplug () { - // Unregister the session from the socket. - if (!name.empty ()) { - bool ok = owner->unregister_session (name.c_str ()); - zmq_assert (ok); - } + // Unregister the session from the socket. There's nothing to do here + // for transient sessions. + if (type == unnamed) + owner->unregister_session (ordinal); + else if (type == named) + owner->unregister_session (name.c_str ()); // Ask associated pipes to terminate. if (in_pipe) { diff --git a/src/session.hpp b/src/session.hpp index 72e1d59..c60cfc7 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -34,8 +34,14 @@ namespace zmq { public: - session_t (object_t *parent_, socket_base_t *owner_, const char *name_, - const options_t &options_, bool reconnect_); + // Creates unnamed session. + session_t (object_t *parent_, socket_base_t *owner_, + const options_t &options_); + + // Creates named session. If name is NULL, transient session with + // auto-generated name is created. + session_t (object_t *parent_, socket_base_t *owner_, + const options_t &options_, const char *name_); // i_inout interface implementation. bool read (::zmq_msg_t *msg_); @@ -44,7 +50,7 @@ namespace zmq void detach (owned_t *reconnecter_); class io_thread_t *get_io_thread (); class socket_base_t *get_owner (); - const char *get_session_name (); + uint64_t get_ordinal (); // i_endpoint interface implementation. void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); @@ -73,6 +79,15 @@ namespace zmq struct i_engine *engine; + enum { + transient, + named, + unnamed + } type; + + // Ordinal of the session (if any). + uint64_t ordinal; + // The name of the session. One that is used to register it with // socket-level repository of sessions. std::string name; @@ -80,9 +95,6 @@ namespace zmq // Inherited socket options. options_t options; - // If true, reconnection is required after connection breaks. - bool reconnect; - session_t (const session_t&); void operator = (const session_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fde258c..43209d5 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -32,7 +32,6 @@ #include "session.hpp" #include "config.hpp" #include "owned.hpp" -#include "uuid.hpp" #include "pipe.hpp" #include "err.hpp" #include "platform.hpp" @@ -46,7 +45,8 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : app_thread (parent_), shutting_down (false), sent_seqnum (0), - processed_seqnum (0) + processed_seqnum (0), + next_ordinal (1) { } @@ -114,10 +114,6 @@ int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::connect (const char *addr_) { - // Generate a unique name for the session. - std::string session_name ("#"); - session_name += uuid_t ().to_string (); - // Parse addr_ string. std::string addr_type; std::string addr_args; @@ -170,10 +166,10 @@ int zmq::socket_base_t::connect (const char *addr_) return 0; } - // Create the session. + // Create unnamed session. io_thread_t *io_thread = choose_io_thread (options.affinity); - session_t *session = new (std::nothrow) session_t (io_thread, this, - session_name.c_str (), options, true); + session_t *session = new (std::nothrow) session_t (io_thread, + this, options); zmq_assert (session); pipe_t *in_pipe = NULL; @@ -213,7 +209,7 @@ int zmq::socket_base_t::connect (const char *addr_) // it is established. zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( choose_io_thread (options.affinity), this, options, - session_name.c_str (), false); + session->get_ordinal (), false); zmq_assert (connecter); int rc = connecter->set_address (addr_args.c_str ()); if (rc != 0) { @@ -245,8 +241,7 @@ int zmq::socket_base_t::connect (const char *addr_) // PGM sender. pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( - choose_io_thread (options.affinity), options, - session_name.c_str ()); + choose_io_thread (options.affinity), options); zmq_assert (pgm_sender); int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); @@ -261,8 +256,7 @@ int zmq::socket_base_t::connect (const char *addr_) // PGM receiver. pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( - choose_io_thread (options.affinity), options, - session_name.c_str ()); + choose_io_thread (options.affinity), options); zmq_assert (pgm_receiver); int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); @@ -408,7 +402,8 @@ int zmq::socket_base_t::close () // Check whether there are no session leaks. sessions_sync.lock (); - zmq_assert (sessions.empty ()); + zmq_assert (named_sessions.empty ()); + zmq_assert (unnamed_sessions.empty ()); sessions_sync.unlock (); delete this; @@ -445,36 +440,74 @@ bool zmq::socket_base_t::register_session (const char *name_, session_t *session_) { sessions_sync.lock (); - bool registered = sessions.insert (std::make_pair (name_, session_)).second; + bool registered = + named_sessions.insert (std::make_pair (name_, session_)).second; sessions_sync.unlock (); return registered; } -bool zmq::socket_base_t::unregister_session (const char *name_) +void zmq::socket_base_t::unregister_session (const char *name_) { sessions_sync.lock (); - sessions_t::iterator it = sessions.find (name_); - bool unregistered = (it != sessions.end ()); - sessions.erase (it); + named_sessions_t::iterator it = named_sessions.find (name_); + zmq_assert (it != named_sessions.end ()); + named_sessions.erase (it); sessions_sync.unlock (); - return unregistered; } zmq::session_t *zmq::socket_base_t::find_session (const char *name_) { sessions_sync.lock (); - sessions_t::iterator it = sessions.find (name_); - if (it == sessions.end ()) { + named_sessions_t::iterator it = named_sessions.find (name_); + if (it == named_sessions.end ()) { + sessions_sync.unlock (); + return NULL; + } + session_t *session = it->second; + + // Prepare the session for subsequent attach command. + session->inc_seqnum (); + + sessions_sync.unlock (); + return session; +} + +uint64_t zmq::socket_base_t::register_session (session_t *session_) +{ + sessions_sync.lock (); + uint64_t ordinal = next_ordinal; + next_ordinal++; + unnamed_sessions.insert (std::make_pair (ordinal, session_)).second; + sessions_sync.unlock (); + return ordinal; +} + +void zmq::socket_base_t::unregister_session (uint64_t ordinal_) +{ + sessions_sync.lock (); + unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); + zmq_assert (it != unnamed_sessions.end ()); + unnamed_sessions.erase (it); + sessions_sync.unlock (); +} + +zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_) +{ + sessions_sync.lock (); + + unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); + if (it == unnamed_sessions.end ()) { sessions_sync.unlock (); return NULL; } + session_t *session = it->second; // Prepare the session for subsequent attach command. - it->second->inc_seqnum (); + session->inc_seqnum (); sessions_sync.unlock (); - return it->second; + return session; } void zmq::socket_base_t::kill (reader_t *pipe_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 79a8340..16553ea 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -34,6 +34,7 @@ #include "options.hpp" #include "stdint.hpp" #include "atomic_counter.hpp" +#include "stdint.hpp" namespace zmq { @@ -74,9 +75,15 @@ namespace zmq // commands as it is unacceptable to wait for the completion of the // action till user application yields control of the application // thread to 0MQ. Locking is used instead. + // There are two distinct types of sessions: those identified by name + // and those identified by ordinal number. Thus two sets of session + // management functions. bool register_session (const char *name_, class session_t *session_); - bool unregister_session (const char *name_); + void unregister_session (const char *name_); class session_t *find_session (const char *name_); + uint64_t register_session (class session_t *session_); + void unregister_session (uint64_t ordinal_); + class session_t *find_session (uint64_t ordinal_); // i_endpoint interface implementation. void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); @@ -144,15 +151,15 @@ namespace zmq // Sequence number of the last command processed by this object. uint64_t processed_seqnum; - // List of existing sessions. This list is never referenced from within - // the socket, instead it is used by I/O objects owned by the session. - // As those objects can live in different threads, the access is - // synchronised using 'sessions_sync' mutex. - // Local sessions are those named by the local instance of 0MQ. - // Remote sessions are the sessions who's identities are provided by - // the remote party. - typedef std::map sessions_t; - sessions_t sessions; + // Lists of existing sessions. This lists are never referenced from + // within the socket, instead they are used by I/O objects owned by + // the socket. As those objects can live in different threads, + // the access is synchronised by mutex. + typedef std::map named_sessions_t; + named_sessions_t named_sessions; + typedef std::map unnamed_sessions_t; + unnamed_sessions_t unnamed_sessions; + uint64_t next_ordinal; mutex_t sessions_sync; socket_base_t (const socket_base_t&); diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 5bda48d..8f95fc0 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -20,19 +20,20 @@ #include #include "zmq_connecter.hpp" -#include "zmq_connecter_init.hpp" +#include "zmq_engine.hpp" +#include "zmq_init.hpp" #include "io_thread.hpp" #include "err.hpp" zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, socket_base_t *owner_, const options_t &options_, - const char *session_name_, bool wait_) : + uint64_t session_ordinal_, bool wait_) : owned_t (parent_, owner_), io_object_t (parent_), handle_valid (false), wait (wait_), - options (options_), - session_name (session_name_) + session_ordinal (session_ordinal_), + options (options_) { } @@ -88,9 +89,9 @@ void zmq::zmq_connecter_t::out_event () } // Create an init object. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_connecter_init_t *init = new (std::nothrow) zmq_connecter_init_t ( - io_thread, owner, fd, options, session_name.c_str (), address.c_str ()); + zmq_init_t *init = new (std::nothrow) zmq_init_t ( + choose_io_thread (options.affinity), owner, + fd, options, true, address.c_str (), session_ordinal); zmq_assert (init); send_plug (init); send_own (owner, init); diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index acd3352..e5b4a70 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -36,7 +36,7 @@ namespace zmq public: zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_, - const options_t &options_, const char *session_name_, bool wait_); + const options_t &options_, uint64_t session_ordinal_, bool wait_); ~zmq_connecter_t (); // Set IP address to connect to. @@ -69,12 +69,12 @@ namespace zmq // If true, connecter is waiting a while before trying to connect. bool wait; + // Ordinal of the session to attach to. + uint64_t session_ordinal; + // Associated socket options. options_t options; - // Name of the session associated with the connecter. - std::string session_name; - // Address to connect to. std::string address; diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp deleted file mode 100644 index f8436a3..0000000 --- a/src/zmq_connecter_init.cpp +++ /dev/null @@ -1,132 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include - -#include "zmq_connecter_init.hpp" -#include "zmq_connecter.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "err.hpp" - -zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_, - socket_base_t *owner_, fd_t fd_, const options_t &options_, - const char *session_name_, const char *address_) : - owned_t (parent_, owner_), - options (options_), - session_name (session_name_) -{ - // Create associated engine object. - engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, true, - address_); - zmq_assert (engine); -} - -zmq::zmq_connecter_init_t::~zmq_connecter_init_t () -{ - if (engine) - delete engine; -} - -bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_) -{ - // Send identity. - int rc = zmq_msg_init_size (msg_, options.identity.size ()); - zmq_assert (rc == 0); - memcpy (zmq_msg_data (msg_), options.identity.c_str (), - options.identity.size ()); - - // Initialisation is done at this point. Disconnect the engine from - // the init object. - engine->unplug (); - - // Find the session associated with this connecter. If it doesn't exist - // drop the newly created connection. If it does, attach it to the - // connection. - session_t *session = NULL; - if (!session_name.empty ()) - session = owner->find_session (session_name.c_str ()); - if (!session) { - - // TODO: - // The socket is already closing. The session is already shut down, - // so no point in continuing with connecting. Shut the connection down. - zmq_assert (false); - } - - // No need to increment seqnum as it was alredy incremented above. - send_attach (session, engine, false); - engine = NULL; - - // Destroy the init object. - term (); - - return true; -} - -bool zmq::zmq_connecter_init_t::write (::zmq_msg_t *msg_) -{ - return false; -} - -void zmq::zmq_connecter_init_t::flush () -{ - // We are not expecting any messages. No point in flushing. -} - -void zmq::zmq_connecter_init_t::detach (owned_t *reconnecter_) -{ - // Plug in the reconnecter object. - zmq_assert (reconnecter_); - send_plug (reconnecter_); - send_own (owner, reconnecter_); - - // This function is called by engine when disconnection occurs. - // The engine will destroy itself, so we just drop the pointer here and - // start termination of the init object. - engine = NULL; - term (); -} - -zmq::io_thread_t *zmq::zmq_connecter_init_t::get_io_thread () -{ - return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::zmq_connecter_init_t::get_owner () -{ - return owner; -} - -const char *zmq::zmq_connecter_init_t::get_session_name () -{ - return session_name.c_str (); -} - -void zmq::zmq_connecter_init_t::process_plug () -{ - zmq_assert (engine); - engine->plug (this); -} - -void zmq::zmq_connecter_init_t::process_unplug () -{ - if (engine) - engine->unplug (); -} diff --git a/src/zmq_connecter_init.hpp b/src/zmq_connecter_init.hpp deleted file mode 100644 index 03ccd24..0000000 --- a/src/zmq_connecter_init.hpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__ -#define __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__ - -#include - -#include "i_inout.hpp" -#include "owned.hpp" -#include "zmq_engine.hpp" -#include "stdint.hpp" -#include "fd.hpp" -#include "options.hpp" - -namespace zmq -{ - - // The class handles initialisation phase of native 0MQ wire-level - // protocol on the connecting side of the connection. - - class zmq_connecter_init_t : public owned_t, public i_inout - { - public: - - zmq_connecter_init_t (class io_thread_t *parent_, socket_base_t *owner_, - fd_t fd_, const options_t &options, const char *session_name_, - const char *address_); - ~zmq_connecter_init_t (); - - private: - - // i_inout interface implementation. - bool read (::zmq_msg_t *msg_); - bool write (::zmq_msg_t *msg_); - void flush (); - void detach (owned_t *reconnecter_); - class io_thread_t *get_io_thread (); - class socket_base_t *get_owner (); - const char *get_session_name (); - - // Handlers for incoming commands. - void process_plug (); - void process_unplug (); - - // Engine is created by zmq_connecter_init_t object. Once the - // initialisation phase is over it is passed to a session object, - // possibly running in a different I/O thread. - zmq_engine_t *engine; - - // Associated socket options. - options_t options; - - // Name of the session to bind new connection to. - std::string session_name; - - zmq_connecter_init_t (const zmq_connecter_init_t&); - void operator = (const zmq_connecter_init_t&); - }; - -} - -#endif diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index c2878a7..cfc87a7 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -159,7 +159,7 @@ void zmq::zmq_engine_t::error () // Ask it to wait for a while before reconnecting. reconnecter = new (std::nothrow) zmq_connecter_t ( inout->get_io_thread (), inout->get_owner (), - options, inout->get_session_name (), true); + options, inout->get_ordinal (), true); zmq_assert (reconnecter); reconnecter->set_address (address.c_str ()); } diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp new file mode 100644 index 0000000..e526b34 --- /dev/null +++ b/src/zmq_init.cpp @@ -0,0 +1,195 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "zmq_init.hpp" +#include "zmq_engine.hpp" +#include "io_thread.hpp" +#include "session.hpp" +#include "err.hpp" + +zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, + fd_t fd_, const options_t &options_, bool reconnect_, + const char *address_, uint64_t session_ordinal_) : + owned_t (parent_, owner_), + sent (false), + received (false), + session_ordinal (session_ordinal_), + options (options_) +{ + // Create the engine object for this connection. + engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, + reconnect_, address_); + zmq_assert (engine); +} + +zmq::zmq_init_t::~zmq_init_t () +{ + if (engine) + delete engine; +} + +bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) +{ + // If the identity was already sent, do nothing. + if (sent) + return false; + + // Send the identity. + int rc = zmq_msg_init_size (msg_, options.identity.size ()); + zmq_assert (rc == 0); + memcpy (zmq_msg_data (msg_), options.identity.c_str (), + options.identity.size ()); + sent = true; + + // If initialisation is done, pass the engine to the session and + // destroy the init object. + finalise (); + + return true; +} + +bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) +{ + // If identity was already received, we are not interested + // in subsequent messages. + if (received) + return false; + + // Retreieve the remote identity. + peer_identity.assign ((const char*) zmq_msg_data (msg_), + zmq_msg_size (msg_)); + received = true; + + return true; +} + +void zmq::zmq_init_t::flush () +{ + // Check if there's anything to flush. + if (!received) + return; + + // If initialisation is done, pass the engine to the session and + // destroy the init object. + finalise (); +} + +void zmq::zmq_init_t::detach (owned_t *reconnecter_) +{ + // This function is called by engine when disconnection occurs. + + // If required, launch the reconnecter. + if (reconnecter_) { + send_plug (reconnecter_); + send_own (owner, reconnecter_); + } + + // The engine will destroy itself, so let's just drop the pointer here and + // start termination of the init object. + engine = NULL; + term (); +} + +zmq::io_thread_t *zmq::zmq_init_t::get_io_thread () +{ + return choose_io_thread (options.affinity); +} + +class zmq::socket_base_t *zmq::zmq_init_t::get_owner () +{ + return owner; +} + +uint64_t zmq::zmq_init_t::get_ordinal () +{ + zmq_assert (false); +} + +void zmq::zmq_init_t::process_plug () +{ + zmq_assert (engine); + engine->plug (this); +} + +void zmq::zmq_init_t::process_unplug () +{ + if (engine) + engine->unplug (); +} + +void zmq::zmq_init_t::finalise () +{ + if (sent && received) { + + // Disconnect the engine from the init object. + engine->unplug (); + + session_t *session = NULL; + + // If we have the session ordinal, let's use it to find the session. + // If it is not found, it means socket is already being shut down + // and the session have been deallocated. + // TODO: We should check whether the name of the peer haven't changed + // upon reconnection. + if (session_ordinal) { + session = owner->find_session (session_ordinal); + if (!session) { + term (); + return; + } + } + + // If the peer has a unique name, find the associated session. If it + // doesn't exist, create it. + else if (!peer_identity.empty ()) { + session = owner->find_session (peer_identity.c_str ()); + if (!session) { + session = new (std::nothrow) session_t ( + choose_io_thread (options.affinity), owner, options, + peer_identity.c_str ()); + zmq_assert (session); + send_plug (session); + send_own (owner, session); + + // Reserve a sequence number for following 'attach' command. + session->inc_seqnum (); + } + } + + // If the other party has no specific identity, let's create a + // transient session. + else { + session = new (std::nothrow) session_t ( + choose_io_thread (options.affinity), owner, options, NULL); + zmq_assert (session); + send_plug (session); + send_own (owner, session); + + // Reserve a sequence number for following 'attach' command. + session->inc_seqnum (); + } + + // No need to increment seqnum as it was laready incremented above. + send_attach (session, engine, false); + + // Destroy the init object. + engine = NULL; + term (); + } +} diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp new file mode 100644 index 0000000..a17d621 --- /dev/null +++ b/src/zmq_init.hpp @@ -0,0 +1,89 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__ +#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__ + +#include + +#include "i_inout.hpp" +#include "i_engine.hpp" +#include "owned.hpp" +#include "fd.hpp" +#include "stdint.hpp" +#include "options.hpp" +#include "stdint.hpp" + +namespace zmq +{ + + // The class handles initialisation phase of 0MQ wire-level protocol. + + class zmq_init_t : public owned_t, public i_inout + { + public: + + zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, + fd_t fd_, const options_t &options_, bool reconnect_, + const char *address_, uint64_t session_ordinal_); + ~zmq_init_t (); + + private: + + void finalise (); + + // i_inout interface implementation. + bool read (::zmq_msg_t *msg_); + bool write (::zmq_msg_t *msg_); + void flush (); + void detach (owned_t *reconnecter_); + class io_thread_t *get_io_thread (); + class socket_base_t *get_owner (); + uint64_t get_ordinal (); + + // Handlers for incoming commands. + void process_plug (); + void process_unplug (); + + // Associated wite-protocol engine. + i_engine *engine; + + // True if our own identity was already sent to the peer. + bool sent; + + // True if peer's identity was already received. + bool received; + + // Identity of the peer socket. + std::string peer_identity; + + // TCP connecter creates session before the name of the peer is known. + // Thus we know only its ordinal number. + uint64_t session_ordinal; + + // Associated socket options. + options_t options; + + zmq_init_t (const zmq_init_t&); + void operator = (const zmq_init_t&); + }; + +} + +#endif diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 5c7552b..6a7e2fd 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -20,7 +20,7 @@ #include #include "zmq_listener.hpp" -#include "zmq_listener_init.hpp" +#include "zmq_init.hpp" #include "io_thread.hpp" #include "err.hpp" @@ -64,8 +64,8 @@ void zmq::zmq_listener_t::in_event () // Create an init object. io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_listener_init_t *init = new (std::nothrow) zmq_listener_init_t ( - io_thread, owner, fd, options); + zmq_init_t *init = new (std::nothrow) zmq_init_t ( + io_thread, owner, fd, options, false, NULL, 0); zmq_assert (init); send_plug (init); send_own (owner, init); diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp deleted file mode 100644 index f7b3001..0000000 --- a/src/zmq_listener_init.cpp +++ /dev/null @@ -1,137 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include - -#include "zmq_listener_init.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "err.hpp" - -zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_, - socket_base_t *owner_, fd_t fd_, const options_t &options_) : - owned_t (parent_, owner_), - options (options_), - has_peer_identity (false) -{ - // Create associated engine object. - engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, - false, NULL); - zmq_assert (engine); -} - -zmq::zmq_listener_init_t::~zmq_listener_init_t () -{ - if (engine) - delete engine; -} - -bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_) -{ - return false; -} - -bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) -{ - // Once we've got peer's identity we aren't interested in subsequent - // messages. - if (has_peer_identity) - return false; - - // Retreieve the remote identity. We'll use it as a local session name. - has_peer_identity = true; - peer_identity.assign ((const char*) zmq_msg_data (msg_), - zmq_msg_size (msg_)); - - return true; -} - -void zmq::zmq_listener_init_t::flush () -{ - if (!has_peer_identity) - return; - - // Initialisation is done. Disconnect the engine from the init object. - engine->unplug (); - - // Have a look whether the session already exists. If it does, attach it - // to the engine. If it doesn't create it first. - session_t *session = NULL; - if (!peer_identity.empty ()) - session = owner->find_session (peer_identity.c_str ()); - if (!session) { - io_thread_t *io_thread = choose_io_thread (options.affinity); - session = new (std::nothrow) session_t (io_thread, owner, - peer_identity.c_str (), options, false); - zmq_assert (session); - send_plug (session); - send_own (owner, session); - - // Reserve a sequence number for following 'attach' command. - session->inc_seqnum (); - } - - // No need to increment seqnum as it was laready incremented above. - send_attach (session, engine, false); - - engine = NULL; - - // Destroy the init object. - term (); -} - -void zmq::zmq_listener_init_t::detach (owned_t *reconnecter_) -{ - // On the listening side of the connection we are never reconnecting. - zmq_assert (reconnecter_ == NULL); - - // This function is called by engine when disconnection occurs. - // The engine will destroy itself, so we just drop the pointer here and - // start termination of the init object. - engine = NULL; - term (); -} - -zmq::io_thread_t *zmq::zmq_listener_init_t::get_io_thread () -{ - return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::zmq_listener_init_t::get_owner () -{ - return owner; -} - -const char *zmq::zmq_listener_init_t::get_session_name () -{ - zmq_assert (false); - return NULL; -} - -void zmq::zmq_listener_init_t::process_plug () -{ - zmq_assert (engine); - engine->plug (this); -} - -void zmq::zmq_listener_init_t::process_unplug () -{ - if (engine) - engine->unplug (); -} diff --git a/src/zmq_listener_init.hpp b/src/zmq_listener_init.hpp deleted file mode 100644 index d7fde02..0000000 --- a/src/zmq_listener_init.hpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__ -#define __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__ - -#include - -#include "i_inout.hpp" -#include "owned.hpp" -#include "zmq_engine.hpp" -#include "stdint.hpp" -#include "fd.hpp" -#include "options.hpp" - -namespace zmq -{ - - // The class handles initialisation phase of native 0MQ wire-level - // protocol on the listening side of the connection. - - class zmq_listener_init_t : public owned_t, public i_inout - { - public: - - zmq_listener_init_t (class io_thread_t *parent_, socket_base_t *owner_, - fd_t fd_, const options_t &options); - ~zmq_listener_init_t (); - - private: - - // i_inout interface implementation. - bool read (::zmq_msg_t *msg_); - bool write (::zmq_msg_t *msg_); - void flush (); - void detach (owned_t *reconnecter_); - class io_thread_t *get_io_thread (); - class socket_base_t *get_owner (); - const char *get_session_name (); - - // Handlers for incoming commands. - void process_plug (); - void process_unplug (); - - // Engine is created by zmq_listener_init_t object. Once the - // initialisation phase is over it is passed to a session object, - // possibly running in a different I/O thread. - zmq_engine_t *engine; - - // Associated socket options. - options_t options; - - // Indetity on the other end of the connection. - bool has_peer_identity; - std::string peer_identity; - - zmq_listener_init_t (const zmq_listener_init_t&); - void operator = (const zmq_listener_init_t&); - }; - -} - -#endif -- cgit v1.2.3