summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/connect_session.cpp45
-rw-r--r--src/connect_session.hpp10
-rw-r--r--src/named_session.cpp12
-rw-r--r--src/named_session.hpp2
-rw-r--r--src/session.cpp38
-rw-r--r--src/session.hpp12
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/transient_session.cpp4
-rw-r--r--src/transient_session.hpp2
9 files changed, 97 insertions, 32 deletions
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
index 0df9854..c0951cb 100644
--- a/src/connect_session.cpp
+++ b/src/connect_session.cpp
@@ -28,12 +28,15 @@ zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
const char *protocol_, const char *address_) :
session_t (io_thread_, socket_, options_),
protocol (protocol_),
- address (address_)
+ address (address_),
+ connected (false)
{
}
zmq::connect_session_t::~connect_session_t ()
{
+ if (connected && !peer_identity.empty ())
+ unregister_session (peer_identity);
}
void zmq::connect_session_t::process_plug ()
@@ -107,8 +110,46 @@ void zmq::connect_session_t::start_connecting (bool wait_)
zmq_assert (false);
}
-void zmq::connect_session_t::attached (const blob_t &peer_identity_)
+bool zmq::connect_session_t::attached (const blob_t &peer_identity_)
{
+ // If there was no previous connection...
+ if (!connected) {
+
+ // Peer has transient identity.
+ if (peer_identity_.empty () || peer_identity_ [0] == 0) {
+ connected = true;
+ return true;
+ }
+
+ // Peer has strong identity. Let's register it and check whether noone
+ // else is using the same identity.
+ if (!register_session (peer_identity_, this)) {
+ log ("DPID: duplicate peer identity - disconnecting peer");
+ return false;
+ }
+ connected = true;
+ peer_identity = peer_identity_;
+ return true;
+ }
+
+ // New engine from listener can conflict with existing engine.
+ // Alternatively, new engine created by reconnection process can
+ // conflict with engine supplied by listener in the meantime.
+ if (has_engine ()) {
+ log ("DPID: duplicate peer identity - disconnecting peer");
+ return false;
+ }
+
+ // If there have been a connection before, we have to check whether
+ // peer's identity haven't changed in the meantime.
+ if ((peer_identity_.empty () || peer_identity_ [0] == 0) &&
+ peer_identity.empty ())
+ return true;
+ if (peer_identity != peer_identity_) {
+ log ("CHID: peer have changed identity - disconnecting peer");
+ return false;
+ }
+ return true;
}
void zmq::connect_session_t::detached ()
diff --git a/src/connect_session.hpp b/src/connect_session.hpp
index 327ad2d..93e2704 100644
--- a/src/connect_session.hpp
+++ b/src/connect_session.hpp
@@ -23,6 +23,7 @@
#include <string>
+#include "blob.hpp"
#include "session.hpp"
namespace zmq
@@ -43,7 +44,7 @@ namespace zmq
private:
// Handlers for events from session base class.
- void attached (const blob_t &peer_identity_);
+ bool attached (const blob_t &peer_identity_);
void detached ();
// Start the connection process.
@@ -56,6 +57,13 @@ namespace zmq
std::string protocol;
std::string address;
+ // If true, the session was already connected to the peer.
+ bool connected;
+
+ // Identity of the peer. If 'connected' is false, it has no meaning.
+ // Otherwise, if it's empty, the peer has transient identity.
+ blob_t peer_identity;
+
connect_session_t (const connect_session_t&);
const connect_session_t &operator = (const connect_session_t&);
};
diff --git a/src/named_session.cpp b/src/named_session.cpp
index 7e2c49e..34f4af4 100644
--- a/src/named_session.cpp
+++ b/src/named_session.cpp
@@ -45,11 +45,17 @@ zmq::named_session_t::~named_session_t ()
unregister_session (peer_identity);
}
-void zmq::named_session_t::attached (const blob_t &peer_identity_)
+bool zmq::named_session_t::attached (const blob_t &peer_identity_)
{
- // The owner should take care to not attach the session
- // to an unrelated peer.
+ // Double check that identities match.
zmq_assert (peer_identity == peer_identity_);
+
+ // If the session already has an engine attached, destroy new one.
+ if (has_engine ()) {
+ log ("DPID: duplicate peer identity - disconnecting peer");
+ return false;
+ }
+ return true;
}
void zmq::named_session_t::detached ()
diff --git a/src/named_session.hpp b/src/named_session.hpp
index fac4745..e3b5aa3 100644
--- a/src/named_session.hpp
+++ b/src/named_session.hpp
@@ -40,7 +40,7 @@ namespace zmq
~named_session_t ();
// Handlers for events from session base class.
- void attached (const blob_t &peer_identity_);
+ bool attached (const blob_t &peer_identity_);
void detached ();
private:
diff --git a/src/session.cpp b/src/session.cpp
index 33c25d9..3ba971f 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -214,26 +214,25 @@ void zmq::session_t::process_plug ()
void zmq::session_t::process_attach (i_engine *engine_,
const blob_t &peer_identity_)
{
- // If some other object (e.g. init) notifies us that the connection failed
- // we need to start the reconnection process.
- if (!engine_) {
- zmq_assert (!engine);
- detached ();
- return;
- }
-
// If we are already terminating, we destroy the engine straight away.
// Note that we don't have to unplug it before deleting as it's not
// yet plugged to the session.
if (state == terminating) {
- delete engine_;
+ if (engine_)
+ delete engine_;
return;
}
- // If the session already has an engine attached, destroy new one.
- // Note new engine is not plugged in yet, we don't have to unplug it.
- if (engine) {
- log ("DPID: duplicate peer identity - disconnecting peer");
+ // If some other object (e.g. init) notifies us that the connection failed
+ // without creating an engine we need to start the reconnection process.
+ if (!engine_) {
+ zmq_assert (!engine);
+ detached ();
+ return;
+ }
+
+ // Trigger the notfication event about the attachment.
+ if (!attached (peer_identity_)) {
delete engine_;
return;
}
@@ -248,8 +247,8 @@ void zmq::session_t::process_attach (i_engine *engine_,
// Create the pipes, as required.
if (options.requires_in) {
- create_pipe (socket, this, options.hwm, options.swap, &socket_reader,
- &out_pipe);
+ create_pipe (socket, this, options.hwm, options.swap,
+ &socket_reader, &out_pipe);
out_pipe->set_event_sink (this);
}
if (options.requires_out) {
@@ -264,11 +263,9 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
// Plug in the engine.
+ zmq_assert (!engine);
engine = engine_;
engine->plug (io_thread, this);
-
- // Trigger the notfication about the attachment.
- attached (peer_identity_);
}
void zmq::session_t::detach ()
@@ -330,6 +327,11 @@ void zmq::session_t::timer_event (int id_)
proceed_with_term ();
}
+bool zmq::session_t::has_engine ()
+{
+ return engine != NULL;
+}
+
bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
{
return socket->register_session (name_, session_);
diff --git a/src/session.hpp b/src/session.hpp
index 499c074..570daa1 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -69,12 +69,16 @@ namespace zmq
void terminate ();
// Two events for the derived session type. Attached is triggered
- // when session is attached to a peer, detached is triggered at the
- // beginning of the termination process when session is about to
- // be detached from the peer.
- virtual void attached (const blob_t &peer_identity_) = 0;
+ // when session is attached to a peer. The function can reject the new
+ // peer by returning false. Detached is triggered at the beginning of
+ // the termination process when session is about to be detached from
+ // the peer.
+ virtual bool attached (const blob_t &peer_identity_) = 0;
virtual void detached () = 0;
+ // Returns true if there is an engine attached to the session.
+ bool has_engine ();
+
// Allows derives session types to (un)register session names.
bool register_session (const blob_t &name_, class session_t *session_);
void unregister_session (const blob_t &name_);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 4cefb6f..abeddb9 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -619,7 +619,9 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
session_t *session = it->second;
// Prepare the session for subsequent attach command.
- session->inc_seqnum ();
+ // Note the connect sessions have NULL pointers registered here.
+ if (session)
+ session->inc_seqnum ();
sessions_sync.unlock ();
return session;
diff --git a/src/transient_session.cpp b/src/transient_session.cpp
index a08f541..10a086f 100644
--- a/src/transient_session.cpp
+++ b/src/transient_session.cpp
@@ -30,8 +30,10 @@ zmq::transient_session_t::~transient_session_t ()
{
}
-void zmq::transient_session_t::attached (const blob_t &peer_identity_)
+bool zmq::transient_session_t::attached (const blob_t &peer_identity_)
{
+ // Transient session is always valid.
+ return true;
}
void zmq::transient_session_t::detached ()
diff --git a/src/transient_session.hpp b/src/transient_session.hpp
index 03f058d..c70a2d7 100644
--- a/src/transient_session.hpp
+++ b/src/transient_session.hpp
@@ -40,7 +40,7 @@ namespace zmq
private:
// Handlers for events from session base class.
- void attached (const blob_t &peer_identity_);
+ bool attached (const blob_t &peer_identity_);
void detached ();
transient_session_t (const transient_session_t&);