From 2e78e48503375a415d95ee8df80df9c065172abc Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 12 Feb 2010 20:49:00 +0100 Subject: Multi-hop REQ/REP, part V., peer identity is passed from init object to session --- src/command.cpp | 10 ++++++++++ src/command.hpp | 2 ++ src/object.cpp | 22 ++++++++++++++++++++-- src/object.hpp | 6 ++++-- src/session.cpp | 3 ++- src/session.hpp | 3 ++- src/zmq_init.cpp | 3 ++- 7 files changed, 42 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/command.cpp b/src/command.cpp index fcb5729..7564fe2 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -17,8 +17,18 @@ along with this program. If not, see . */ +#include + #include "command.hpp" void zmq::deallocate_command (command_t *cmd_) { + switch (cmd_->type) { + case command_t::attach: + if (cmd_->args.attach.peer_identity) + free (cmd_->args.attach.peer_identity); + break; + default: + /* noop */; + } } diff --git a/src/command.hpp b/src/command.hpp index 976285e..6187b72 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -66,6 +66,8 @@ namespace zmq // Attach the engine to the session. struct { struct i_engine *engine; + unsigned char peer_identity_size; + unsigned char *peer_identity; } attach; // Sent from session to socket to establish pipe(s) between them. diff --git a/src/object.cpp b/src/object.cpp index faa922e..73a17a3 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -17,6 +17,8 @@ along with this program. If not, see . */ +#include + #include "object.hpp" #include "dispatcher.hpp" #include "err.hpp" @@ -80,7 +82,9 @@ void zmq::object_t::process_command (command_t &cmd_) break; case command_t::attach: - process_attach (cmd_.args.attach.engine); + process_attach (cmd_.args.attach.engine, + cmd_.args.attach.peer_identity_size, + cmd_.args.attach.peer_identity); process_seqnum (); break; @@ -180,6 +184,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) } void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_, bool inc_seqnum_) { if (inc_seqnum_) @@ -189,6 +194,18 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, cmd.destination = destination_; cmd.type = command_t::attach; cmd.args.attach.engine = engine_; + if (!peer_identity_size_) { + cmd.args.attach.peer_identity_size = 0; + cmd.args.attach.peer_identity = NULL; + } + else { + cmd.args.attach.peer_identity_size = peer_identity_size_; + cmd.args.attach.peer_identity = + (unsigned char*) malloc (peer_identity_size_); + zmq_assert (cmd.args.attach.peer_identity_size); + memcpy (cmd.args.attach.peer_identity, peer_identity_, + peer_identity_size_); + } send_command (cmd); } @@ -271,7 +288,8 @@ void zmq::object_t::process_own (owned_t *object_) zmq_assert (false); } -void zmq::object_t::process_attach (i_engine *engine_) +void zmq::object_t::process_attach (i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index e6b2379..4c82a0d 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -64,7 +64,8 @@ namespace zmq void send_own (class socket_base_t *destination_, class owned_t *object_); void send_attach (class session_t *destination_, - struct i_engine *engine_, bool inc_seqnum_ = true); + struct i_engine *engine_, unsigned char peer_identity_size_, + unsigned char *peer_identity_, bool inc_seqnum_ = true); void send_bind (class socket_base_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_, bool inc_seqnum_ = true); @@ -81,7 +82,8 @@ namespace zmq virtual void process_stop (); virtual void process_plug (); virtual void process_own (class owned_t *object_); - virtual void process_attach (struct i_engine *engine_); + virtual void process_attach (struct i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_); virtual void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); virtual void process_revive (); diff --git a/src/session.cpp b/src/session.cpp index 1aece4d..07971e1 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -232,7 +232,8 @@ void zmq::session_t::process_unplug () } } -void zmq::session_t::process_attach (i_engine *engine_) +void zmq::session_t::process_attach (i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_) { zmq_assert (!engine); zmq_assert (engine_); diff --git a/src/session.hpp b/src/session.hpp index 375d095..2c6b462 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -66,7 +66,8 @@ namespace zmq // Handlers for incoming commands. void process_plug (); void process_unplug (); - void process_attach (struct i_engine *engine_); + void process_attach (struct i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_); // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index b49baa9..6baa88f 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -192,7 +192,8 @@ void zmq::zmq_init_t::finalise () } // No need to increment seqnum as it was laready incremented above. - send_attach (session, engine, false); + send_attach (session, engine, (unsigned char) peer_identity.size (), + (unsigned char*) peer_identity.data (), false); // Destroy the init object. engine = NULL; -- cgit v1.2.3