From 4405250d93a2ad6eb3940c4bc4fe8ea32bd52f9e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 13 Feb 2010 15:30:03 +0100 Subject: Multi-hop REQ/REP, part IX., pass the peer identity as far as socket_base_t --- src/command.cpp | 4 ++++ src/command.hpp | 2 ++ src/object.cpp | 28 ++++++++++++++++++++++++---- src/object.hpp | 4 ++-- src/options.cpp | 2 +- src/options.hpp | 5 ++--- src/pgm_socket.cpp | 7 +++++-- src/session.cpp | 2 +- src/socket_base.cpp | 5 +++-- src/socket_base.hpp | 3 ++- 10 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/command.cpp b/src/command.cpp index 7564fe2..8bf7ea2 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -28,6 +28,10 @@ void zmq::deallocate_command (command_t *cmd_) if (cmd_->args.attach.peer_identity) free (cmd_->args.attach.peer_identity); break; + case command_t::bind: + if (cmd_->args.bind.peer_identity) + free (cmd_->args.bind.peer_identity); + break; default: /* noop */; } diff --git a/src/command.hpp b/src/command.hpp index 6187b72..150cad1 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -75,6 +75,8 @@ namespace zmq struct { class reader_t *in_pipe; class writer_t *out_pipe; + unsigned char peer_identity_size; + unsigned char *peer_identity; } bind; // Sent by pipe writer to inform dormant pipe reader that there diff --git a/src/object.cpp b/src/object.cpp index eaa67c9..356fcd1 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -89,7 +89,9 @@ void zmq::object_t::process_command (command_t &cmd_) break; case command_t::bind: - process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, + blob_t (cmd_.args.bind.peer_identity, + cmd_.args.bind.peer_identity_size)); process_seqnum (); break; @@ -198,7 +200,9 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, cmd.args.attach.peer_identity = NULL; } else { - cmd.args.attach.peer_identity_size = peer_identity_.size (); + zmq_assert (peer_identity_.size () <= 0xff); + cmd.args.attach.peer_identity_size = + (unsigned char) peer_identity_.size (); cmd.args.attach.peer_identity = (unsigned char*) malloc (peer_identity_.size ()); zmq_assert (cmd.args.attach.peer_identity_size); @@ -209,7 +213,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, } void zmq::object_t::send_bind (socket_base_t *destination_, - reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_) + reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_, + bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -219,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_, cmd.type = command_t::bind; cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.out_pipe = out_pipe_; + if (peer_identity_.empty ()) { + cmd.args.bind.peer_identity_size = 0; + cmd.args.bind.peer_identity = NULL; + } + else { + zmq_assert (peer_identity_.size () <= 0xff); + cmd.args.bind.peer_identity_size = + (unsigned char) peer_identity_.size (); + cmd.args.bind.peer_identity = + (unsigned char*) malloc (peer_identity_.size ()); + zmq_assert (cmd.args.bind.peer_identity_size); + memcpy (cmd.args.bind.peer_identity, peer_identity_.data (), + peer_identity_.size ()); + } send_command (cmd); } @@ -293,7 +312,8 @@ void zmq::object_t::process_attach (i_engine *engine_, zmq_assert (false); } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + const blob_t &peer_identity_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index d492695..1544109 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -69,7 +69,7 @@ namespace zmq bool inc_seqnum_ = true); void send_bind (class socket_base_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_, - bool inc_seqnum_ = true); + const blob_t &peer_identity_, bool inc_seqnum_ = true); void send_revive (class object_t *destination_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); @@ -86,7 +86,7 @@ namespace zmq virtual void process_attach (struct i_engine *engine_, const blob_t &peer_identity_); virtual void process_bind (class reader_t *in_pipe_, - class writer_t *out_pipe_); + class writer_t *out_pipe_, const blob_t &peer_identity_); virtual void process_revive (); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); diff --git a/src/options.cpp b/src/options.cpp index cdfccc6..a70b9a3 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -76,7 +76,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; case ZMQ_IDENTITY: - identity.assign ((const char*) optval_, optvallen_); + identity.assign ((const unsigned char*) optval_, optvallen_); return 0; case ZMQ_RATE: diff --git a/src/options.hpp b/src/options.hpp index f9ff6e4..541e6e8 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -20,10 +20,9 @@ #ifndef __ZMQ_OPTIONS_HPP_INCLUDED__ #define __ZMQ_OPTIONS_HPP_INCLUDED__ -#include - #include "stddef.h" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -38,7 +37,7 @@ namespace zmq int64_t lwm; int64_t swap; uint64_t affinity; - std::string identity; + blob_t identity; // Maximum tranfer rate [kb/s]. Default 100kb/s. uint32_t rate; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1eeb34f..462a3a9 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (options.identity.size () > 0) { - // Create gsi from identity string. - gsi_base = options.identity; + // Create gsi from identity. + // TODO: We assume that identity is standard C string here. + // What if it contains binary zeroes? + gsi_base.assign ((const char*) options.identity.data (), + options.identity.size ()); } else { // Generate random gsi. diff --git a/src/session.cpp b/src/session.cpp index b2393d8..909501a 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -245,7 +245,7 @@ void zmq::session_t::process_attach (i_engine *engine_, } send_bind (owner, outbound ? &outbound->reader : NULL, - inbound ? &inbound->writer : NULL); + inbound ? &inbound->writer : NULL, peer_identity); } // Plug in the engine. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 7d90236..1d4eae6 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -171,7 +171,7 @@ int zmq::socket_base_t::connect (const char *addr_) // was incremented in find_endpoint function. The callee is notified // about the fact via the last parameter. send_bind (peer, out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL, false); + in_pipe ? &in_pipe->writer : NULL, options.identity, false); return 0; } @@ -564,7 +564,8 @@ void zmq::socket_base_t::process_own (owned_t *object_) io_objects.insert (object_); } -void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + const blob_t &peer_identity_) { attach_pipes (in_pipe_, out_pipe_); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 39f09de..a1702a7 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -122,7 +122,8 @@ namespace zmq // Handlers for incoming commands. void process_own (class owned_t *object_); - void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); + void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, + const blob_t &peer_identity_); void process_term_req (class owned_t *object_); void process_term_ack (); void process_seqnum (); -- cgit v1.2.3