diff options
Diffstat (limited to 'src/zmq_init.cpp')
-rw-r--r-- | src/zmq_init.cpp | 53 |
1 files changed, 24 insertions, 29 deletions
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 9492caa..3e76cb9 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -17,10 +17,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string.h> + #include "zmq_init.hpp" #include "zmq_engine.hpp" #include "io_thread.hpp" #include "session.hpp" +#include "uuid.hpp" #include "err.hpp" zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, @@ -71,17 +74,21 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) if (received) return false; - // Retreieve the remote identity. - peer_identity.assign ((const char*) zmq_msg_data (msg_), - zmq_msg_size (msg_)); + // Retreieve the remote identity. If it's empty, generate a unique name. + if (!zmq_msg_size (msg_)) { + unsigned char identity [uuid_t::uuid_string_len + 1]; + identity [0] = 0; + memcpy (identity + 1, uuid_t ().to_string (), uuid_t::uuid_string_len); + peer_identity.assign (identity, uuid_t::uuid_string_len + 1); + } + else { + peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), + zmq_msg_size (msg_)); + } + if (options.traceroute) + engine->add_prefix (peer_identity); received = true; - // Once the initial handshaking is over, XREP sockets should start - // tracerouting individual messages. - if (options.type == ZMQ_XREP) - engine->traceroute ((unsigned char*) peer_identity.data (), - peer_identity.size ()); - return true; } @@ -160,15 +167,16 @@ void zmq::zmq_init_t::finalise () return; } } + else { - // If the peer has a unique name, find the associated session. If it - // doesn't exist, create it. - else if (!peer_identity.empty ()) { - session = owner->find_session (peer_identity.c_str ()); + // If the peer has a unique name, find the associated session. + // If it does not exist, create it. + zmq_assert (!peer_identity.empty ()); + session = owner->find_session (peer_identity); if (!session) { session = new (std::nothrow) session_t ( choose_io_thread (options.affinity), owner, options, - peer_identity.c_str ()); + peer_identity); zmq_assert (session); send_plug (session); send_own (owner, session); @@ -178,21 +186,8 @@ void zmq::zmq_init_t::finalise () } } - // If the other party has no specific identity, let's create a - // transient session. - else { - session = new (std::nothrow) session_t ( - choose_io_thread (options.affinity), owner, options, NULL); - zmq_assert (session); - send_plug (session); - send_own (owner, session); - - // Reserve a sequence number for following 'attach' command. - session->inc_seqnum (); - } - - // No need to increment seqnum as it was laready incremented above. - send_attach (session, engine, false); + // No need to increment seqnum as it was already incremented above. + send_attach (session, engine, peer_identity, false); // Destroy the init object. engine = NULL; |