summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-02-13 14:07:30 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-02-13 14:07:30 +0100
commitf5ce81f2893ec0707c2f4346740878e68b51e13a (patch)
tree88e0a375594c06879b62668a81a2e2250c23a2ac
parentcdc2efe9b5f0d1f45065b1c32e5eabd7e9f78a12 (diff)
Multi-hop REQ/REP, part VIII., new blob_t type used for holding identity
-rw-r--r--src/Makefile.am1
-rw-r--r--src/blob.hpp33
-rw-r--r--src/i_engine.hpp5
-rw-r--r--src/object.cpp19
-rw-r--r--src/object.hpp7
-rw-r--r--src/pgm_receiver.cpp3
-rw-r--r--src/pgm_receiver.hpp2
-rw-r--r--src/pgm_sender.cpp3
-rw-r--r--src/pgm_sender.hpp2
-rw-r--r--src/session.cpp32
-rw-r--r--src/session.hpp13
-rw-r--r--src/socket_base.cpp24
-rw-r--r--src/socket_base.hpp14
-rw-r--r--src/zmq_decoder.cpp34
-rw-r--r--src/zmq_decoder.hpp6
-rw-r--r--src/zmq_engine.cpp5
-rw-r--r--src/zmq_engine.hpp2
-rw-r--r--src/zmq_init.cpp17
-rw-r--r--src/zmq_init.hpp5
19 files changed, 113 insertions, 114 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 32f088e..f19c0f6 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -57,6 +57,7 @@ libzmq_la_SOURCES = app_thread.hpp \
atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
+ blob.hpp \
command.hpp \
config.hpp \
decoder.hpp \
diff --git a/src/blob.hpp b/src/blob.hpp
new file mode 100644
index 0000000..a4fa8cd
--- /dev/null
+++ b/src/blob.hpp
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_BLOB_HPP_INCLUDED__
+#define __ZMQ_BLOB_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ // Object to hold dynamically allocated opaque binary data.
+ typedef std::basic_string <unsigned char> blob_t;
+
+}
+
+#endif
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index bcb4297..d64027d 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -22,6 +22,8 @@
#include <stddef.h>
+#include "blob.hpp"
+
namespace zmq
{
@@ -42,8 +44,7 @@ namespace zmq
// Start tracing the message route. Engine should add the identity
// supplied to all inbound messages and trim identity from all the
// outbound messages.
- virtual void traceroute (unsigned char *identity_,
- size_t identity_size_) = 0;
+ virtual void traceroute (const blob_t &identity_) = 0;
};
}
diff --git a/src/object.cpp b/src/object.cpp
index 73a17a3..eaa67c9 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -83,8 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::attach:
process_attach (cmd_.args.attach.engine,
- cmd_.args.attach.peer_identity_size,
- cmd_.args.attach.peer_identity);
+ blob_t (cmd_.args.attach.peer_identity,
+ cmd_.args.attach.peer_identity_size));
process_seqnum ();
break;
@@ -184,8 +184,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
- unsigned char peer_identity_size_, unsigned char *peer_identity_,
- bool inc_seqnum_)
+ const blob_t &peer_identity_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -194,17 +193,17 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
- if (!peer_identity_size_) {
+ if (peer_identity_.empty ()) {
cmd.args.attach.peer_identity_size = 0;
cmd.args.attach.peer_identity = NULL;
}
else {
- cmd.args.attach.peer_identity_size = peer_identity_size_;
+ cmd.args.attach.peer_identity_size = peer_identity_.size ();
cmd.args.attach.peer_identity =
- (unsigned char*) malloc (peer_identity_size_);
+ (unsigned char*) malloc (peer_identity_.size ());
zmq_assert (cmd.args.attach.peer_identity_size);
- memcpy (cmd.args.attach.peer_identity, peer_identity_,
- peer_identity_size_);
+ memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
}
send_command (cmd);
}
@@ -289,7 +288,7 @@ void zmq::object_t::process_own (owned_t *object_)
}
void zmq::object_t::process_attach (i_engine *engine_,
- unsigned char peer_identity_size_, unsigned char *peer_identity_)
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index 4c82a0d..d492695 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_OBJECT_HPP_INCLUDED__
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -64,8 +65,8 @@ namespace zmq
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
- struct i_engine *engine_, unsigned char peer_identity_size_,
- unsigned char *peer_identity_, bool inc_seqnum_ = true);
+ struct i_engine *engine_, const blob_t &peer_identity_,
+ bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
bool inc_seqnum_ = true);
@@ -83,7 +84,7 @@ namespace zmq
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
virtual void process_attach (struct i_engine *engine_,
- unsigned char peer_identity_size_, unsigned char *peer_identity_);
+ const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_,
class writer_t *out_pipe_);
virtual void process_revive ();
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index a2ba9c6..b7ca327 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,8 +88,7 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_receiver_t::traceroute (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index f03551f..b01e5b0 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,7 +54,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index fa7d7e0..acbc3fb 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -102,8 +102,7 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
-void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_sender_t::traceroute (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 89357f5..7041610 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,7 +52,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/session.cpp b/src/session.cpp
index 1fab3c2..b2393d8 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -40,24 +40,18 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
}
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, unsigned char peer_identity_size_,
- unsigned char *peer_identity_) :
+ const options_t &options_, const blob_t &peer_identity_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (true),
out_pipe (NULL),
engine (NULL),
ordinal (0),
+ peer_identity (peer_identity_),
options (options_)
{
-
-if (!peer_identity_size_)
-
- // If peer identity is not supplied, leave it empty.
- if (peer_identity_size_) {
- peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
- if (!owner->register_session (peer_identity_size_, peer_identity_,
- this)) {
+ if (!peer_identity.empty ()) {
+ if (!owner->register_session (peer_identity, this)) {
// TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the
@@ -180,8 +174,7 @@ void zmq::session_t::process_unplug ()
if (ordinal)
owner->unregister_session (ordinal);
else if (!peer_identity.empty ())
- owner->unregister_session ((unsigned char) peer_identity.size (),
- (unsigned char*) peer_identity.data ());
+ owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
if (in_pipe) {
@@ -201,26 +194,23 @@ void zmq::session_t::process_unplug ()
}
void zmq::session_t::process_attach (i_engine *engine_,
- unsigned char peer_identity_size_, unsigned char *peer_identity_)
+ const blob_t &peer_identity_)
{
if (!peer_identity.empty ()) {
// If we already know the peer name do nothing, just check whether
// it haven't changed.
- zmq_assert (peer_identity.size () == peer_identity_size_);
- zmq_assert (memcmp (peer_identity.data (), peer_identity_,
- peer_identity_size_) == 0);
+ zmq_assert (peer_identity == peer_identity_);
}
- else if (peer_identity_size_) {
+ else if (!peer_identity_.empty ()) {
- // Remember the peer identity.
- peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
+ // Store the peer identity.
+ peer_identity = peer_identity_;
// If the session is not registered with the ordinal, let's register
// it using the peer name.
if (!ordinal) {
- if (!owner->register_session (peer_identity_size_, peer_identity_,
- this)) {
+ if (!owner->register_session (peer_identity, this)) {
// TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the
diff --git a/src/session.hpp b/src/session.hpp
index 7607cfb..d412728 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -20,12 +20,11 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
-#include <string>
-
#include "i_inout.hpp"
#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -38,11 +37,9 @@ namespace zmq
session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_);
- // Creates named session. If name is NULL, transient session with
- // auto-generated name is created.
+ // Creates named session.
session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, unsigned char peer_identity_size_,
- unsigned char *peer_identity_);
+ const options_t &options_, const blob_t &peer_identity_);
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
@@ -68,7 +65,7 @@ namespace zmq
void process_plug ();
void process_unplug ();
void process_attach (struct i_engine *engine_,
- unsigned char peer_identity_size_, unsigned char *peer_identity_);
+ const blob_t &peer_identity_);
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
@@ -87,7 +84,7 @@ namespace zmq
uint64_t ordinal;
// Identity of the peer.
- std::string peer_identity;
+ blob_t peer_identity;
// Inherited socket options.
options_t options;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 4af69a0..7d90236 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -267,7 +267,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_sender, 0, NULL);
+ send_attach (session, pgm_sender, blob_t ());
}
else if (options.requires_in) {
@@ -282,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_receiver, 0, NULL);
+ send_attach (session, pgm_receiver, blob_t ());
}
else
zmq_assert (false);
@@ -454,33 +454,29 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
-bool zmq::socket_base_t::register_session (unsigned char peer_identity_size_,
- unsigned char *peer_identity_, session_t *session_)
+bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
+ session_t *session_)
{
sessions_sync.lock ();
- bool registered = named_sessions.insert (std::make_pair (std::string (
- (char*) peer_identity_, peer_identity_size_), session_)).second;
+ bool registered = named_sessions.insert (
+ std::make_pair (peer_identity_, session_)).second;
sessions_sync.unlock ();
return registered;
}
-void zmq::socket_base_t::unregister_session (unsigned char peer_identity_size_,
- unsigned char *peer_identity_)
+void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (std::string (
- (char*) peer_identity_, peer_identity_size_));
+ named_sessions_t::iterator it = named_sessions.find (peer_identity_);
zmq_assert (it != named_sessions.end ());
named_sessions.erase (it);
sessions_sync.unlock ();
}
-zmq::session_t *zmq::socket_base_t::find_session (
- unsigned char peer_identity_size_, unsigned char *peer_identity_)
+zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (std::string (
- (char*) peer_identity_, peer_identity_size_));
+ named_sessions_t::iterator it = named_sessions.find (peer_identity_);
if (it == named_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index a2878ea..39f09de 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -23,7 +23,6 @@
#include <set>
#include <map>
#include <vector>
-#include <string>
#include "../bindings/c/zmq.h"
@@ -35,6 +34,7 @@
#include "stdint.hpp"
#include "atomic_counter.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -78,12 +78,10 @@ namespace zmq
// There are two distinct types of sessions: those identified by name
// and those identified by ordinal number. Thus two sets of session
// management functions.
- bool register_session (unsigned char peer_identity_size_,
- unsigned char *peer_identity_, class session_t *session_);
- void unregister_session (unsigned char peer_identity_size_,
- unsigned char *peer_identity_);
- class session_t *find_session (unsigned char peer_identity_size_,
- unsigned char *peer_identity_);
+ bool register_session (const blob_t &peer_identity_,
+ class session_t *session_);
+ void unregister_session (const blob_t &peer_identity_);
+ class session_t *find_session (const blob_t &peer_identity_);
uint64_t register_session (class session_t *session_);
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
@@ -158,7 +156,7 @@ namespace zmq
// within the socket, instead they are used by I/O objects owned by
// the socket. As those objects can live in different threads,
// the access is synchronised by mutex.
- typedef std::map <std::string, session_t*> named_sessions_t;
+ typedef std::map <blob_t, session_t*> named_sessions_t;
named_sessions_t named_sessions;
typedef std::map <uint64_t, session_t*> unnamed_sessions_t;
unnamed_sessions_t unnamed_sessions;
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index f502ffd..20e07bc 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -27,9 +27,7 @@
zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
decoder_t <zmq_decoder_t> (bufsize_),
- destination (NULL),
- prefix (NULL),
- prefix_size (0)
+ destination (NULL)
{
zmq_msg_init (&in_progress);
@@ -39,9 +37,6 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
zmq::zmq_decoder_t::~zmq_decoder_t ()
{
- if (prefix)
- free (prefix);
-
zmq_msg_close (&in_progress);
}
@@ -50,13 +45,9 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
destination = destination_;
}
-void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_,
- size_t prefix_size_)
+void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_)
{
- prefix = malloc (prefix_size_);
- zmq_assert (prefix);
- memcpy (prefix, prefix_, prefix_size_);
- prefix_size = prefix_size_;
+ prefix = prefix_;
}
bool zmq::zmq_decoder_t::one_byte_size_ready ()
@@ -72,15 +63,16 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf);
+ int rc = zmq_msg_init_size (&in_progress, prefix.size () + *tmpbuf);
errno_assert (rc == 0);
// Fill in the message prefix if any.
- if (prefix)
- memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
+ if (!prefix.empty ())
+ memcpy (zmq_msg_data (&in_progress), prefix.data (),
+ prefix.size ());
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
- *tmpbuf, &zmq_decoder_t::message_ready);
+ next_step ((unsigned char*) zmq_msg_data (&in_progress) +
+ prefix.size (), *tmpbuf, &zmq_decoder_t::message_ready);
}
return true;
}
@@ -95,14 +87,14 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, prefix_size + size);
+ int rc = zmq_msg_init_size (&in_progress, prefix.size () + size);
errno_assert (rc == 0);
// Fill in the message prefix if any.
- if (prefix)
- memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
+ if (!prefix.empty ())
+ memcpy (zmq_msg_data (&in_progress), prefix.data (), prefix.size ());
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size ,
+ next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix.size (),
size, &zmq_decoder_t::message_ready);
return true;
}
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index dfabece..11ee6c2 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -23,6 +23,7 @@
#include "../bindings/c/zmq.h"
#include "decoder.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -41,7 +42,7 @@ namespace zmq
// Once called, all decoded messages will be prefixed by the specified
// prefix.
- void add_prefix (unsigned char *prefix_, size_t prefix_size_);
+ void add_prefix (const blob_t &prefix_);
private:
@@ -53,8 +54,7 @@ namespace zmq
unsigned char tmpbuf [8];
::zmq_msg_t in_progress;
- void *prefix;
- size_t prefix_size;
+ blob_t prefix;
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index bda098c..75f3441 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -160,11 +160,10 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
-void zmq::zmq_engine_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::zmq_engine_t::traceroute (const blob_t &identity_)
{
encoder.trim_prefix ();
- decoder.add_prefix (identity_, identity_size_);
+ decoder.add_prefix (identity_);
}
void zmq::zmq_engine_t::error ()
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 174dd1a..8657e8e 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -47,7 +47,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index f062ede..9aebad0 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -72,15 +72,14 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
return false;
// Retreieve the remote identity.
- peer_identity.assign ((const char*) zmq_msg_data (msg_),
+ peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
received = true;
// Once the initial handshaking is over, XREP sockets should start
// tracerouting individual messages.
if (options.traceroute)
- engine->traceroute ((unsigned char*) peer_identity.data (),
- peer_identity.size ());
+ engine->traceroute (peer_identity);
return true;
}
@@ -164,14 +163,11 @@ void zmq::zmq_init_t::finalise ()
// If the peer has a unique name, find the associated session. If it
// doesn't exist, create it.
else if (!peer_identity.empty ()) {
- session = owner->find_session (
- (unsigned char) peer_identity.size (),
- (unsigned char*) peer_identity.data ());
+ session = owner->find_session (peer_identity);
if (!session) {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options,
- (unsigned char) peer_identity.size (),
- (unsigned char*) peer_identity.c_str ());
+ peer_identity);
zmq_assert (session);
send_plug (session);
send_own (owner, session);
@@ -185,7 +181,7 @@ void zmq::zmq_init_t::finalise ()
// transient session.
else {
session = new (std::nothrow) session_t (
- choose_io_thread (options.affinity), owner, options, 0, NULL);
+ choose_io_thread (options.affinity), owner, options, blob_t ());
zmq_assert (session);
send_plug (session);
send_own (owner, session);
@@ -195,8 +191,7 @@ void zmq::zmq_init_t::finalise ()
}
// No need to increment seqnum as it was already incremented above.
- send_attach (session, engine, (unsigned char) peer_identity.size (),
- (unsigned char*) peer_identity.data (), false);
+ send_attach (session, engine, peer_identity, false);
// Destroy the init object.
engine = NULL;
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index df14293..6f935c2 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -20,8 +20,6 @@
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
-#include <string>
-
#include "i_inout.hpp"
#include "i_engine.hpp"
#include "owned.hpp"
@@ -29,6 +27,7 @@
#include "stdint.hpp"
#include "options.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -72,7 +71,7 @@ namespace zmq
bool received;
// Identity of the peer socket.
- std::string peer_identity;
+ blob_t peer_identity;
// TCP connecter creates session before the name of the peer is known.
// Thus we know only its ordinal number.