summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/zmq_setsockopt.txt10
-rw-r--r--src/Makefile.am2
-rw-r--r--src/blob.hpp33
-rw-r--r--src/command.cpp38
-rw-r--r--src/command.hpp7
-rw-r--r--src/dispatcher.cpp4
-rw-r--r--src/downstream.cpp3
-rw-r--r--src/downstream.hpp3
-rw-r--r--src/i_endpoint.hpp4
-rw-r--r--src/i_engine.hpp12
-rw-r--r--src/object.cpp67
-rw-r--r--src/object.hpp11
-rw-r--r--src/options.cpp16
-rw-r--r--src/options.hpp17
-rw-r--r--src/p2p.cpp3
-rw-r--r--src/p2p.hpp3
-rw-r--r--src/pgm_receiver.cpp9
-rw-r--r--src/pgm_receiver.hpp3
-rw-r--r--src/pgm_sender.cpp9
-rw-r--r--src/pgm_sender.hpp3
-rw-r--r--src/pgm_socket.cpp7
-rw-r--r--src/pub.cpp3
-rw-r--r--src/pub.hpp3
-rw-r--r--src/rep.cpp8
-rw-r--r--src/rep.hpp3
-rw-r--r--src/req.cpp3
-rw-r--r--src/req.hpp3
-rw-r--r--src/session.cpp134
-rw-r--r--src/session.hpp29
-rw-r--r--src/socket_base.cpp87
-rw-r--r--src/socket_base.hpp19
-rw-r--r--src/sub.cpp3
-rw-r--r--src/sub.hpp3
-rw-r--r--src/upstream.cpp3
-rw-r--r--src/upstream.hpp3
-rw-r--r--src/uuid.hpp6
-rw-r--r--src/xrep.cpp60
-rw-r--r--src/xrep.hpp10
-rw-r--r--src/xreq.cpp3
-rw-r--r--src/xreq.hpp3
-rw-r--r--src/zmq_decoder.cpp62
-rw-r--r--src/zmq_decoder.hpp6
-rw-r--r--src/zmq_encoder.cpp14
-rw-r--r--src/zmq_engine.cpp11
-rw-r--r--src/zmq_engine.hpp3
-rw-r--r--src/zmq_init.cpp53
-rw-r--r--src/zmq_init.hpp5
47 files changed, 534 insertions, 270 deletions
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 549a2de..629bffc 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -60,9 +60,11 @@ If the socket has no identity, each run of the application is completely
separated from other runs. However, with identity application reconnects to
existing infrastructure left by the previous run. Thus it may receive
messages that were sent in the meantime, it shares pipe limits with the
-previous run etc.
+previous run etc. Identity should be at least one byte and at most 255 bytes
+long. Identities starting with binary zero are reserver for use by 0MQ
+infrastructure.
+
-Type: string Unit: N/A Default: NULL
+Type: BLOB Unit: N/A Default: NULL
*ZMQ_SUBSCRIBE*::
Applicable only to ZMQ_SUB socket type. It establishes new message filter.
@@ -72,7 +74,7 @@ beginning with specific prefix (e.g. "animals.mammals.dogs."). Multiple
filters can be attached to a single 'sub' socket. In that case message passes
if it matches at least one of the filters.
+
-Type: string Unit: N/A Default: N/A
+Type: BLOB Unit: N/A Default: N/A
*ZMQ_UNSUBSCRIBE*::
Applicable only to ZMQ_SUB socket type. Removes existing message filter.
@@ -81,7 +83,7 @@ exactly. If there were several instances of the same filter created,
this options removes only one of them, leaving the rest in place
and functional.
+
-Type: string Unit: N/A Default: N/A
+Type: BLOB Unit: N/A Default: N/A
*ZMQ_RATE*::
This option applies only to sending side of multicast transports (pgm & udp).
diff --git a/src/Makefile.am b/src/Makefile.am
index 446b1e2..4146f68 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \
atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
+ blob.hpp \
command.hpp \
config.hpp \
decoder.hpp \
@@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_init.hpp \
zmq_listener.hpp \
app_thread.cpp \
+ command.cpp \
devpoll.cpp \
dispatcher.cpp \
downstream.cpp \
diff --git a/src/blob.hpp b/src/blob.hpp
new file mode 100644
index 0000000..a4fa8cd
--- /dev/null
+++ b/src/blob.hpp
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_BLOB_HPP_INCLUDED__
+#define __ZMQ_BLOB_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ // Object to hold dynamically allocated opaque binary data.
+ typedef std::basic_string <unsigned char> blob_t;
+
+}
+
+#endif
diff --git a/src/command.cpp b/src/command.cpp
new file mode 100644
index 0000000..8bf7ea2
--- /dev/null
+++ b/src/command.cpp
@@ -0,0 +1,38 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <stdlib.h>
+
+#include "command.hpp"
+
+void zmq::deallocate_command (command_t *cmd_)
+{
+ switch (cmd_->type) {
+ case command_t::attach:
+ if (cmd_->args.attach.peer_identity)
+ free (cmd_->args.attach.peer_identity);
+ break;
+ case command_t::bind:
+ if (cmd_->args.bind.peer_identity)
+ free (cmd_->args.bind.peer_identity);
+ break;
+ default:
+ /* noop */;
+ }
+}
diff --git a/src/command.hpp b/src/command.hpp
index 469d6ec..150cad1 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -66,6 +66,8 @@ namespace zmq
// Attach the engine to the session.
struct {
struct i_engine *engine;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} attach;
// Sent from session to socket to establish pipe(s) between them.
@@ -73,6 +75,8 @@ namespace zmq
struct {
class reader_t *in_pipe;
class writer_t *out_pipe;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
@@ -107,6 +111,9 @@ namespace zmq
} args;
};
+ // Function to deallocate dynamically allocated components of the command.
+ void deallocate_command (command_t *cmd_);
+
}
#endif
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 8aafcf8..4233278 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t ()
while (!pipes.empty ())
delete *pipes.begin ();
+ // TODO: Deallocate any commands still in the pipes. Keep in mind that
+ // simple reading from a pipe and deallocating commands won't do as
+ // command pipe has template parameter D set to true, meaning that
+ // read may return false even if there are still commands in the pipe.
delete [] command_pipes;
#ifdef ZMQ_HAVE_WINDOWS
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 29b0689..3431264 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -26,7 +26,6 @@
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_DOWNSTREAM;
options.requires_in = false;
options.requires_out = true;
}
@@ -36,7 +35,7 @@ zmq::downstream_t::~downstream_t ()
}
void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_ && outpipe_);
lb.attach (outpipe_);
diff --git a/src/downstream.hpp b/src/downstream.hpp
index 35dec95..dbd79a5 100644
--- a/src/downstream.hpp
+++ b/src/downstream.hpp
@@ -34,7 +34,8 @@ namespace zmq
~downstream_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index d60b39e..ddab6a4 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+#include "blob.hpp"
+
namespace zmq
{
@@ -28,7 +30,7 @@ namespace zmq
virtual ~i_endpoint () {}
virtual void attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_) = 0;
+ class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0;
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index bcb4297..81b56df 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -22,6 +22,8 @@
#include <stddef.h>
+#include "blob.hpp"
+
namespace zmq
{
@@ -39,11 +41,11 @@ namespace zmq
// are messages to send available.
virtual void revive () = 0;
- // Start tracing the message route. Engine should add the identity
- // supplied to all inbound messages and trim identity from all the
- // outbound messages.
- virtual void traceroute (unsigned char *identity_,
- size_t identity_size_) = 0;
+ // Engine should add the prefix supplied to all inbound messages.
+ virtual void add_prefix (const blob_t &identity_) = 0;
+
+ // Engine should trim prefix from all the outbound messages.
+ virtual void trim_prefix () = 0;
};
}
diff --git a/src/object.cpp b/src/object.cpp
index a977f39..356fcd1 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "object.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
@@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::own:
process_own (cmd_.args.own.object);
- return;
+ break;
case command_t::attach:
- process_attach (cmd_.args.attach.engine);
+ process_attach (cmd_.args.attach.engine,
+ blob_t (cmd_.args.attach.peer_identity,
+ cmd_.args.attach.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::bind:
- process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
+ blob_t (cmd_.args.bind.peer_identity,
+ cmd_.args.bind.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::pipe_term:
process_pipe_term ();
@@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::pipe_term_ack:
process_pipe_term_ack ();
- return;
+ break;
case command_t::term_req:
process_term_req (cmd_.args.term_req.object);
- return;
+ break;
case command_t::term:
process_term ();
- return;
+ break;
case command_t::term_ack:
process_term_ack ();
- return;
+ break;
default:
zmq_assert (false);
}
+
+ // The assumption here is that each command is processed once only,
+ // so deallocating it after processing is all right.
+ deallocate_command (&cmd_);
}
void zmq::object_t::register_pipe (class pipe_t *pipe_)
@@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
- bool inc_seqnum_)
+ const blob_t &peer_identity_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
+ if (peer_identity_.empty ()) {
+ cmd.args.attach.peer_identity_size = 0;
+ cmd.args.attach.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.attach.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.attach.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.attach.peer_identity_size);
+ memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
void zmq::object_t::send_bind (socket_base_t *destination_,
- reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_)
+ reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_,
+ bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_,
cmd.type = command_t::bind;
cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_;
+ if (peer_identity_.empty ()) {
+ cmd.args.bind.peer_identity_size = 0;
+ cmd.args.bind.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.bind.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.bind.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.bind.peer_identity_size);
+ memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
@@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_)
zmq_assert (false);
}
-void zmq::object_t::process_attach (i_engine *engine_)
+void zmq::object_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
-void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index e6b2379..1544109 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_OBJECT_HPP_INCLUDED__
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -64,10 +65,11 @@ namespace zmq
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
- struct i_engine *engine_, bool inc_seqnum_ = true);
+ struct i_engine *engine_, const blob_t &peer_identity_,
+ bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
- bool inc_seqnum_ = true);
+ const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
@@ -81,9 +83,10 @@ namespace zmq
virtual void process_stop ();
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
- virtual void process_attach (struct i_engine *engine_);
+ virtual void process_attach (struct i_engine *engine_,
+ const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_,
- class writer_t *out_pipe_);
+ class writer_t *out_pipe_, const blob_t &peer_identity_);
virtual void process_revive ();
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
diff --git a/src/options.cpp b/src/options.cpp
index f9d93d6..f78d8de 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -23,7 +23,6 @@
#include "err.hpp"
zmq::options_t::options_t () :
- type (-1),
hwm (0),
lwm (0),
swap (0),
@@ -34,7 +33,9 @@ zmq::options_t::options_t () :
sndbuf (0),
rcvbuf (0),
requires_in (false),
- requires_out (false)
+ requires_out (false),
+ immediate_connect (true),
+ traceroute (false)
{
}
@@ -76,7 +77,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
case ZMQ_IDENTITY:
- identity.assign ((const char*) optval_, optvallen_);
+
+ // Empty identity is invalid as well as identity longer than
+ // 255 bytes. Identity starting with binary zero is invalid
+ // as these are used for auto-generated identities.
+ if (optvallen_ < 1 || optvallen_ > 255 ||
+ *((const unsigned char*) optval_) == 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ identity.assign ((const unsigned char*) optval_, optvallen_);
return 0;
case ZMQ_RATE:
diff --git a/src/options.hpp b/src/options.hpp
index dbe3701..6d9be4d 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -20,10 +20,9 @@
#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__
#define __ZMQ_OPTIONS_HPP_INCLUDED__
-#include <string>
-
#include "stddef.h"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -34,14 +33,11 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
- // Type of the associated socket. One of the constants defined in zmq.h
- int type;
-
int64_t hwm;
int64_t lwm;
int64_t swap;
uint64_t affinity;
- std::string identity;
+ blob_t identity;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
uint32_t rate;
@@ -59,6 +55,15 @@ namespace zmq
// provided by the specific socket type.
bool requires_in;
bool requires_out;
+
+ // If true, when connecting, pipes are created immediately without
+ // waiting for the connection to be established. That way the socket
+ // is not aware of the peer's identity, however, it is able to send
+ // messages straight away.
+ bool immediate_connect;
+
+ // If true, socket requires tracerouting the messages.
+ bool traceroute;
};
}
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 46bbd0b..ca7a8f5 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
outpipe (NULL),
alive (true)
{
- options.type = ZMQ_P2P;
options.requires_in = true;
options.requires_out = true;
}
@@ -43,7 +42,7 @@ zmq::p2p_t::~p2p_t ()
}
void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_;
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 2ff1047..bca0eab 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -33,7 +33,8 @@ namespace zmq
~p2p_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index a2ba9c6..e708229 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,8 +88,13 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
+void zmq::pgm_receiver_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index f03551f..3f0ef81 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,7 +54,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index fa7d7e0..27b4d0c 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -102,8 +102,13 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
-void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
+void zmq::pgm_sender_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 89357f5..951c417 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,7 +52,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 1eeb34f..462a3a9 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (options.identity.size () > 0) {
- // Create gsi from identity string.
- gsi_base = options.identity;
+ // Create gsi from identity.
+ // TODO: We assume that identity is standard C string here.
+ // What if it contains binary zeroes?
+ gsi_base.assign ((const char*) options.identity.data (),
+ options.identity.size ());
} else {