diff options
-rw-r--r-- | src/connect_session.cpp | 45 | ||||
-rw-r--r-- | src/connect_session.hpp | 10 | ||||
-rw-r--r-- | src/named_session.cpp | 12 | ||||
-rw-r--r-- | src/named_session.hpp | 2 | ||||
-rw-r--r-- | src/session.cpp | 38 | ||||
-rw-r--r-- | src/session.hpp | 12 | ||||
-rw-r--r-- | src/socket_base.cpp | 4 | ||||
-rw-r--r-- | src/transient_session.cpp | 4 | ||||
-rw-r--r-- | src/transient_session.hpp | 2 |
9 files changed, 97 insertions, 32 deletions
diff --git a/src/connect_session.cpp b/src/connect_session.cpp index 0df9854..c0951cb 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -28,12 +28,15 @@ zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, const char *protocol_, const char *address_) : session_t (io_thread_, socket_, options_), protocol (protocol_), - address (address_) + address (address_), + connected (false) { } zmq::connect_session_t::~connect_session_t () { + if (connected && !peer_identity.empty ()) + unregister_session (peer_identity); } void zmq::connect_session_t::process_plug () @@ -107,8 +110,46 @@ void zmq::connect_session_t::start_connecting (bool wait_) zmq_assert (false); } -void zmq::connect_session_t::attached (const blob_t &peer_identity_) +bool zmq::connect_session_t::attached (const blob_t &peer_identity_) { + // If there was no previous connection... + if (!connected) { + + // Peer has transient identity. + if (peer_identity_.empty () || peer_identity_ [0] == 0) { + connected = true; + return true; + } + + // Peer has strong identity. Let's register it and check whether noone + // else is using the same identity. + if (!register_session (peer_identity_, this)) { + log ("DPID: duplicate peer identity - disconnecting peer"); + return false; + } + connected = true; + peer_identity = peer_identity_; + return true; + } + + // New engine from listener can conflict with existing engine. + // Alternatively, new engine created by reconnection process can + // conflict with engine supplied by listener in the meantime. + if (has_engine ()) { + log ("DPID: duplicate peer identity - disconnecting peer"); + return false; + } + + // If there have been a connection before, we have to check whether + // peer's identity haven't changed in the meantime. + if ((peer_identity_.empty () || peer_identity_ [0] == 0) && + peer_identity.empty ()) + return true; + if (peer_identity != peer_identity_) { + log ("CHID: peer have changed identity - disconnecting peer"); + return false; + } + return true; } void zmq::connect_session_t::detached () diff --git a/src/connect_session.hpp b/src/connect_session.hpp index 327ad2d..93e2704 100644 --- a/src/connect_session.hpp +++ b/src/connect_session.hpp @@ -23,6 +23,7 @@ #include <string> +#include "blob.hpp" #include "session.hpp" namespace zmq @@ -43,7 +44,7 @@ namespace zmq private: // Handlers for events from session base class. - void attached (const blob_t &peer_identity_); + bool attached (const blob_t &peer_identity_); void detached (); // Start the connection process. @@ -56,6 +57,13 @@ namespace zmq std::string protocol; std::string address; + // If true, the session was already connected to the peer. + bool connected; + + // Identity of the peer. If 'connected' is false, it has no meaning. + // Otherwise, if it's empty, the peer has transient identity. + blob_t peer_identity; + connect_session_t (const connect_session_t&); const connect_session_t &operator = (const connect_session_t&); }; diff --git a/src/named_session.cpp b/src/named_session.cpp index 7e2c49e..34f4af4 100644 --- a/src/named_session.cpp +++ b/src/named_session.cpp @@ -45,11 +45,17 @@ zmq::named_session_t::~named_session_t () unregister_session (peer_identity); } -void zmq::named_session_t::attached (const blob_t &peer_identity_) +bool zmq::named_session_t::attached (const blob_t &peer_identity_) { - // The owner should take care to not attach the session - // to an unrelated peer. + // Double check that identities match. zmq_assert (peer_identity == peer_identity_); + + // If the session already has an engine attached, destroy new one. + if (has_engine ()) { + log ("DPID: duplicate peer identity - disconnecting peer"); + return false; + } + return true; } void zmq::named_session_t::detached () diff --git a/src/named_session.hpp b/src/named_session.hpp index fac4745..e3b5aa3 100644 --- a/src/named_session.hpp +++ b/src/named_session.hpp @@ -40,7 +40,7 @@ namespace zmq ~named_session_t (); // Handlers for events from session base class. - void attached (const blob_t &peer_identity_); + bool attached (const blob_t &peer_identity_); void detached (); private: diff --git a/src/session.cpp b/src/session.cpp index 33c25d9..3ba971f 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -214,26 +214,25 @@ void zmq::session_t::process_plug () void zmq::session_t::process_attach (i_engine *engine_, const blob_t &peer_identity_) { - // If some other object (e.g. init) notifies us that the connection failed - // we need to start the reconnection process. - if (!engine_) { - zmq_assert (!engine); - detached (); - return; - } - // If we are already terminating, we destroy the engine straight away. // Note that we don't have to unplug it before deleting as it's not // yet plugged to the session. if (state == terminating) { - delete engine_; + if (engine_) + delete engine_; return; } - // If the session already has an engine attached, destroy new one. - // Note new engine is not plugged in yet, we don't have to unplug it. - if (engine) { - log ("DPID: duplicate peer identity - disconnecting peer"); + // If some other object (e.g. init) notifies us that the connection failed + // without creating an engine we need to start the reconnection process. + if (!engine_) { + zmq_assert (!engine); + detached (); + return; + } + + // Trigger the notfication event about the attachment. + if (!attached (peer_identity_)) { delete engine_; return; } @@ -248,8 +247,8 @@ void zmq::session_t::process_attach (i_engine *engine_, // Create the pipes, as required. if (options.requires_in) { - create_pipe (socket, this, options.hwm, options.swap, &socket_reader, - &out_pipe); + create_pipe (socket, this, options.hwm, options.swap, + &socket_reader, &out_pipe); out_pipe->set_event_sink (this); } if (options.requires_out) { @@ -264,11 +263,9 @@ void zmq::session_t::process_attach (i_engine *engine_, } // Plug in the engine. + zmq_assert (!engine); engine = engine_; engine->plug (io_thread, this); - - // Trigger the notfication about the attachment. - attached (peer_identity_); } void zmq::session_t::detach () @@ -330,6 +327,11 @@ void zmq::session_t::timer_event (int id_) proceed_with_term (); } +bool zmq::session_t::has_engine () +{ + return engine != NULL; +} + bool zmq::session_t::register_session (const blob_t &name_, session_t *session_) { return socket->register_session (name_, session_); diff --git a/src/session.hpp b/src/session.hpp index 499c074..570daa1 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -69,12 +69,16 @@ namespace zmq void terminate (); // Two events for the derived session type. Attached is triggered - // when session is attached to a peer, detached is triggered at the - // beginning of the termination process when session is about to - // be detached from the peer. - virtual void attached (const blob_t &peer_identity_) = 0; + // when session is attached to a peer. The function can reject the new + // peer by returning false. Detached is triggered at the beginning of + // the termination process when session is about to be detached from + // the peer. + virtual bool attached (const blob_t &peer_identity_) = 0; virtual void detached () = 0; + // Returns true if there is an engine attached to the session. + bool has_engine (); + // Allows derives session types to (un)register session names. bool register_session (const blob_t &name_, class session_t *session_); void unregister_session (const blob_t &name_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 4cefb6f..abeddb9 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -619,7 +619,9 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) session_t *session = it->second; // Prepare the session for subsequent attach command. - session->inc_seqnum (); + // Note the connect sessions have NULL pointers registered here. + if (session) + session->inc_seqnum (); sessions_sync.unlock (); return session; diff --git a/src/transient_session.cpp b/src/transient_session.cpp index a08f541..10a086f 100644 --- a/src/transient_session.cpp +++ b/src/transient_session.cpp @@ -30,8 +30,10 @@ zmq::transient_session_t::~transient_session_t () { } -void zmq::transient_session_t::attached (const blob_t &peer_identity_) +bool zmq::transient_session_t::attached (const blob_t &peer_identity_) { + // Transient session is always valid. + return true; } void zmq::transient_session_t::detached () diff --git a/src/transient_session.hpp b/src/transient_session.hpp index 03f058d..c70a2d7 100644 --- a/src/transient_session.hpp +++ b/src/transient_session.hpp @@ -40,7 +40,7 @@ namespace zmq private: // Handlers for events from session base class. - void attached (const blob_t &peer_identity_); + bool attached (const blob_t &peer_identity_); void detached (); transient_session_t (const transient_session_t&); |