summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-02-12 20:49:00 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-02-12 20:49:00 +0100
commit2e78e48503375a415d95ee8df80df9c065172abc (patch)
treef3a3dd6dfd4457f3446435bea3c0a850da701926
parentd8430f4b9a43bf8c99854298edc9f1bc35c0e8ec (diff)
Multi-hop REQ/REP, part V., peer identity is passed from init object to session
-rw-r--r--src/command.cpp10
-rw-r--r--src/command.hpp2
-rw-r--r--src/object.cpp22
-rw-r--r--src/object.hpp6
-rw-r--r--src/session.cpp3
-rw-r--r--src/session.hpp3
-rw-r--r--src/zmq_init.cpp3
7 files changed, 42 insertions, 7 deletions
diff --git a/src/command.cpp b/src/command.cpp
index fcb5729..7564fe2 100644
--- a/src/command.cpp
+++ b/src/command.cpp
@@ -17,8 +17,18 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <stdlib.h>
+
#include "command.hpp"
void zmq::deallocate_command (command_t *cmd_)
{
+ switch (cmd_->type) {
+ case command_t::attach:
+ if (cmd_->args.attach.peer_identity)
+ free (cmd_->args.attach.peer_identity);
+ break;
+ default:
+ /* noop */;
+ }
}
diff --git a/src/command.hpp b/src/command.hpp
index 976285e..6187b72 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -66,6 +66,8 @@ namespace zmq
// Attach the engine to the session.
struct {
struct i_engine *engine;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} attach;
// Sent from session to socket to establish pipe(s) between them.
diff --git a/src/object.cpp b/src/object.cpp
index faa922e..73a17a3 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "object.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
@@ -80,7 +82,9 @@ void zmq::object_t::process_command (command_t &cmd_)
break;
case command_t::attach:
- process_attach (cmd_.args.attach.engine);
+ process_attach (cmd_.args.attach.engine,
+ cmd_.args.attach.peer_identity_size,
+ cmd_.args.attach.peer_identity);
process_seqnum ();
break;
@@ -180,6 +184,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
+ unsigned char peer_identity_size_, unsigned char *peer_identity_,
bool inc_seqnum_)
{
if (inc_seqnum_)
@@ -189,6 +194,18 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
+ if (!peer_identity_size_) {
+ cmd.args.attach.peer_identity_size = 0;
+ cmd.args.attach.peer_identity = NULL;
+ }
+ else {
+ cmd.args.attach.peer_identity_size = peer_identity_size_;
+ cmd.args.attach.peer_identity =
+ (unsigned char*) malloc (peer_identity_size_);
+ zmq_assert (cmd.args.attach.peer_identity_size);
+ memcpy (cmd.args.attach.peer_identity, peer_identity_,
+ peer_identity_size_);
+ }
send_command (cmd);
}
@@ -271,7 +288,8 @@ void zmq::object_t::process_own (owned_t *object_)
zmq_assert (false);
}
-void zmq::object_t::process_attach (i_engine *engine_)
+void zmq::object_t::process_attach (i_engine *engine_,
+ unsigned char peer_identity_size_, unsigned char *peer_identity_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index e6b2379..4c82a0d 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -64,7 +64,8 @@ namespace zmq
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
- struct i_engine *engine_, bool inc_seqnum_ = true);
+ struct i_engine *engine_, unsigned char peer_identity_size_,
+ unsigned char *peer_identity_, bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
bool inc_seqnum_ = true);
@@ -81,7 +82,8 @@ namespace zmq
virtual void process_stop ();
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
- virtual void process_attach (struct i_engine *engine_);
+ virtual void process_attach (struct i_engine *engine_,
+ unsigned char peer_identity_size_, unsigned char *peer_identity_);
virtual void process_bind (class reader_t *in_pipe_,
class writer_t *out_pipe_);
virtual void process_revive ();
diff --git a/src/session.cpp b/src/session.cpp
index 1aece4d..07971e1 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -232,7 +232,8 @@ void zmq::session_t::process_unplug ()
}
}
-void zmq::session_t::process_attach (i_engine *engine_)
+void zmq::session_t::process_attach (i_engine *engine_,
+ unsigned char peer_identity_size_, unsigned char *peer_identity_)
{
zmq_assert (!engine);
zmq_assert (engine_);
diff --git a/src/session.hpp b/src/session.hpp
index 375d095..2c6b462 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -66,7 +66,8 @@ namespace zmq
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
- void process_attach (struct i_engine *engine_);
+ void process_attach (struct i_engine *engine_,
+ unsigned char peer_identity_size_, unsigned char *peer_identity_);
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index b49baa9..6baa88f 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -192,7 +192,8 @@ void zmq::zmq_init_t::finalise ()
}
// No need to increment seqnum as it was laready incremented above.
- send_attach (session, engine, false);
+ send_attach (session, engine, (unsigned char) peer_identity.size (),
+ (unsigned char*) peer_identity.data (), false);
// Destroy the init object.
engine = NULL;