From cdc2efe9b5f0d1f45065b1c32e5eabd7e9f78a12 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 13 Feb 2010 13:07:33 +0100 Subject: Multi-hop REQ/REP, part VII., identity-related algorithms rewritten --- src/session.cpp | 114 ++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 69 insertions(+), 45 deletions(-) (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp index c92fb0c..1fab3c2 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, out_pipe (NULL), engine (NULL), options (options_) -{ - type = unnamed; - +{ // It's possible to register the session at this point as it will be // searched for only on reconnect, i.e. no race condition (session found // before it is plugged into it's I/O thread) is possible. @@ -52,13 +50,20 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, ordinal (0), options (options_) { + +if (!peer_identity_size_) + + // If peer identity is not supplied, leave it empty. if (peer_identity_size_) { - type = named; peer_identity.assign ((char*) peer_identity_, peer_identity_size_); - } - else { - type = transient; - // TODO: Generate unique name here. + if (!owner->register_session (peer_identity_size_, peer_identity_, + this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } } } @@ -104,7 +109,7 @@ void zmq::session_t::detach (owned_t *reconnecter_) engine = NULL; // Terminate transient session. - if (type == transient) + if (!ordinal && peer_identity.empty ()) term (); } @@ -120,7 +125,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner () uint64_t zmq::session_t::get_ordinal () { - zmq_assert (type == unnamed); zmq_assert (ordinal); return ordinal; } @@ -168,13 +172,62 @@ void zmq::session_t::revive (reader_t *pipe_) void zmq::session_t::process_plug () { - // Register the session with the socket. +} + +void zmq::session_t::process_unplug () +{ + // Unregister the session from the socket. + if (ordinal) + owner->unregister_session (ordinal); + else if (!peer_identity.empty ()) + owner->unregister_session ((unsigned char) peer_identity.size (), + (unsigned char*) peer_identity.data ()); + + // Ask associated pipes to terminate. + if (in_pipe) { + in_pipe->term (); + in_pipe = NULL; + } + if (out_pipe) { + out_pipe->term (); + out_pipe = NULL; + } + + if (engine) { + engine->unplug (); + delete engine; + engine = NULL; + } +} + +void zmq::session_t::process_attach (i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_) +{ if (!peer_identity.empty ()) { - bool ok = owner->register_session (peer_identity.c_str (), this); - // There's already a session with the specified identity. - // We should syslog it and drop the session. TODO - zmq_assert (ok); + // If we already know the peer name do nothing, just check whether + // it haven't changed. + zmq_assert (peer_identity.size () == peer_identity_size_); + zmq_assert (memcmp (peer_identity.data (), peer_identity_, + peer_identity_size_) == 0); + } + else if (peer_identity_size_) { + + // Remember the peer identity. + peer_identity.assign ((char*) peer_identity_, peer_identity_size_); + + // If the session is not registered with the ordinal, let's register + // it using the peer name. + if (!ordinal) { + if (!owner->register_session (peer_identity_size_, peer_identity_, + this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } + } } // If session is created by 'connect' function, it has the pipes set @@ -204,37 +257,8 @@ void zmq::session_t::process_plug () send_bind (owner, outbound ? &outbound->reader : NULL, inbound ? &inbound->writer : NULL); } -} - -void zmq::session_t::process_unplug () -{ - // Unregister the session from the socket. There's nothing to do here - // for transient sessions. - if (type == unnamed) - owner->unregister_session (ordinal); - else if (type == named) - owner->unregister_session (peer_identity.c_str ()); - // Ask associated pipes to terminate. - if (in_pipe) { - in_pipe->term (); - in_pipe = NULL; - } - if (out_pipe) { - out_pipe->term (); - out_pipe = NULL; - } - - if (engine) { - engine->unplug (); - delete engine; - engine = NULL; - } -} - -void zmq::session_t::process_attach (i_engine *engine_, - unsigned char peer_identity_size_, unsigned char *peer_identity_) -{ + // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); engine = engine_; -- cgit v1.2.3