From 75f571c8844231f4172f131e1dd6ba2348eb54e5 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 19 Feb 2010 15:24:43 +0100 Subject: Multi-hop REQ/REP, part XII., generate unique identities for anonymous connections --- doc/zmq_setsockopt.txt | 10 ++++++---- src/options.cpp | 9 +++++++++ src/session.cpp | 6 +++--- src/uuid.hpp | 6 +++--- src/zmq_encoder.cpp | 1 - src/zmq_engine.cpp | 4 +++- src/zmq_init.cpp | 40 ++++++++++++++++++++-------------------- 7 files changed, 44 insertions(+), 32 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 549a2de..629bffc 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -60,9 +60,11 @@ If the socket has no identity, each run of the application is completely separated from other runs. However, with identity application reconnects to existing infrastructure left by the previous run. Thus it may receive messages that were sent in the meantime, it shares pipe limits with the -previous run etc. +previous run etc. Identity should be at least one byte and at most 255 bytes +long. Identities starting with binary zero are reserver for use by 0MQ +infrastructure. + -Type: string Unit: N/A Default: NULL +Type: BLOB Unit: N/A Default: NULL *ZMQ_SUBSCRIBE*:: Applicable only to ZMQ_SUB socket type. It establishes new message filter. @@ -72,7 +74,7 @@ beginning with specific prefix (e.g. "animals.mammals.dogs."). Multiple filters can be attached to a single 'sub' socket. In that case message passes if it matches at least one of the filters. + -Type: string Unit: N/A Default: N/A +Type: BLOB Unit: N/A Default: N/A *ZMQ_UNSUBSCRIBE*:: Applicable only to ZMQ_SUB socket type. Removes existing message filter. @@ -81,7 +83,7 @@ exactly. If there were several instances of the same filter created, this options removes only one of them, leaving the rest in place and functional. + -Type: string Unit: N/A Default: N/A +Type: BLOB Unit: N/A Default: N/A *ZMQ_RATE*:: This option applies only to sending side of multicast transports (pgm & udp). diff --git a/src/options.cpp b/src/options.cpp index b77af24..f78d8de 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -77,6 +77,15 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; case ZMQ_IDENTITY: + + // Empty identity is invalid as well as identity longer than + // 255 bytes. Identity starting with binary zero is invalid + // as these are used for auto-generated identities. + if (optvallen_ < 1 || optvallen_ > 255 || + *((const unsigned char*) optval_) == 0) { + errno = EINVAL; + return -1; + } identity.assign ((const unsigned char*) optval_, optvallen_); return 0; diff --git a/src/session.cpp b/src/session.cpp index 74bd8ae..05f319c 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -50,7 +50,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, peer_identity (peer_identity_), options (options_) { - if (!peer_identity.empty ()) { + if (!peer_identity.empty () && peer_identity [0] != 0) { if (!owner->register_session (peer_identity, this)) { // TODO: There's already a session with the specified @@ -103,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_) engine = NULL; // Terminate transient session. - if (!ordinal && peer_identity.empty ()) + if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0)) term (); } @@ -173,7 +173,7 @@ void zmq::session_t::process_unplug () // Unregister the session from the socket. if (ordinal) owner->unregister_session (ordinal); - else if (!peer_identity.empty ()) + else if (!peer_identity.empty () && peer_identity [0] != 0) owner->unregister_session (peer_identity); // Ask associated pipes to terminate. diff --git a/src/uuid.hpp b/src/uuid.hpp index 001ea94..f565f8d 100644 --- a/src/uuid.hpp +++ b/src/uuid.hpp @@ -44,6 +44,9 @@ namespace zmq uuid_t (); ~uuid_t (); + // The length of textual representation of UUID. + enum { uuid_string_len = 36 }; + // Returns a pointer to buffer containing the textual // representation of the UUID. The caller is reponsible to // free the allocated memory. @@ -51,9 +54,6 @@ namespace zmq private: - // The length of textual representation of UUID. - enum { uuid_string_len = 36 }; - #if defined ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_MINGW32 typedef unsigned char* RPC_CSTR; diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 5fca182..68626fa 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -89,7 +89,6 @@ bool zmq::zmq_encoder_t::message_ready () size -= prefix_size; } - // For messages less than 255 bytes long, write one byte of message size. // For longer messages write 0xff escape character followed by 8-byte // message size. diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 152daf6..623ca63 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -17,6 +17,8 @@ along with this program. If not, see . */ +#include + #include #include "zmq_engine.hpp" @@ -161,7 +163,7 @@ void zmq::zmq_engine_t::revive () } void zmq::zmq_engine_t::add_prefix (const blob_t &identity_) -{ +{ decoder.add_prefix (identity_); } diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 7c5588f..3e76cb9 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -17,10 +17,13 @@ along with this program. If not, see . */ +#include + #include "zmq_init.hpp" #include "zmq_engine.hpp" #include "io_thread.hpp" #include "session.hpp" +#include "uuid.hpp" #include "err.hpp" zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, @@ -71,10 +74,19 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) if (received) return false; - // Retreieve the remote identity. - peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), - zmq_msg_size (msg_)); - engine->add_prefix (peer_identity); + // Retreieve the remote identity. If it's empty, generate a unique name. + if (!zmq_msg_size (msg_)) { + unsigned char identity [uuid_t::uuid_string_len + 1]; + identity [0] = 0; + memcpy (identity + 1, uuid_t ().to_string (), uuid_t::uuid_string_len); + peer_identity.assign (identity, uuid_t::uuid_string_len + 1); + } + else { + peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), + zmq_msg_size (msg_)); + } + if (options.traceroute) + engine->add_prefix (peer_identity); received = true; return true; @@ -155,10 +167,11 @@ void zmq::zmq_init_t::finalise () return; } } + else { - // If the peer has a unique name, find the associated session. If it - // doesn't exist, create it. - else if (!peer_identity.empty ()) { + // If the peer has a unique name, find the associated session. + // If it does not exist, create it. + zmq_assert (!peer_identity.empty ()); session = owner->find_session (peer_identity); if (!session) { session = new (std::nothrow) session_t ( @@ -173,19 +186,6 @@ void zmq::zmq_init_t::finalise () } } - // If the other party has no specific identity, let's create a - // transient session. - else { - session = new (std::nothrow) session_t ( - choose_io_thread (options.affinity), owner, options, blob_t ()); - zmq_assert (session); - send_plug (session); - send_own (owner, session); - - // Reserve a sequence number for following 'attach' command. - session->inc_seqnum (); - } - // No need to increment seqnum as it was already incremented above. send_attach (session, engine, peer_identity, false); -- cgit v1.2.3