summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/blob.hpp33
-rw-r--r--src/command.cpp38
-rw-r--r--src/command.hpp7
-rw-r--r--src/dispatcher.cpp4
-rw-r--r--src/downstream.cpp3
-rw-r--r--src/downstream.hpp3
-rw-r--r--src/i_endpoint.hpp4
-rw-r--r--src/i_engine.hpp12
-rw-r--r--src/object.cpp67
-rw-r--r--src/object.hpp11
-rw-r--r--src/options.cpp16
-rw-r--r--src/options.hpp17
-rw-r--r--src/p2p.cpp3
-rw-r--r--src/p2p.hpp3
-rw-r--r--src/pgm_receiver.cpp9
-rw-r--r--src/pgm_receiver.hpp3
-rw-r--r--src/pgm_sender.cpp9
-rw-r--r--src/pgm_sender.hpp3
-rw-r--r--src/pgm_socket.cpp7
-rw-r--r--src/pub.cpp3
-rw-r--r--src/pub.hpp3
-rw-r--r--src/rep.cpp8
-rw-r--r--src/rep.hpp3
-rw-r--r--src/req.cpp3
-rw-r--r--src/req.hpp3
-rw-r--r--src/session.cpp134
-rw-r--r--src/session.hpp29
-rw-r--r--src/socket_base.cpp87
-rw-r--r--src/socket_base.hpp19
-rw-r--r--src/sub.cpp3
-rw-r--r--src/sub.hpp3
-rw-r--r--src/upstream.cpp3
-rw-r--r--src/upstream.hpp3
-rw-r--r--src/uuid.hpp6
-rw-r--r--src/xrep.cpp60
-rw-r--r--src/xrep.hpp10
-rw-r--r--src/xreq.cpp3
-rw-r--r--src/xreq.hpp3
-rw-r--r--src/zmq_decoder.cpp62
-rw-r--r--src/zmq_decoder.hpp6
-rw-r--r--src/zmq_encoder.cpp14
-rw-r--r--src/zmq_engine.cpp11
-rw-r--r--src/zmq_engine.hpp3
-rw-r--r--src/zmq_init.cpp53
-rw-r--r--src/zmq_init.hpp5
46 files changed, 528 insertions, 266 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 446b1e2..4146f68 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \
atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
+ blob.hpp \
command.hpp \
config.hpp \
decoder.hpp \
@@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_init.hpp \
zmq_listener.hpp \
app_thread.cpp \
+ command.cpp \
devpoll.cpp \
dispatcher.cpp \
downstream.cpp \
diff --git a/src/blob.hpp b/src/blob.hpp
new file mode 100644
index 0000000..a4fa8cd
--- /dev/null
+++ b/src/blob.hpp
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_BLOB_HPP_INCLUDED__
+#define __ZMQ_BLOB_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ // Object to hold dynamically allocated opaque binary data.
+ typedef std::basic_string <unsigned char> blob_t;
+
+}
+
+#endif
diff --git a/src/command.cpp b/src/command.cpp
new file mode 100644
index 0000000..8bf7ea2
--- /dev/null
+++ b/src/command.cpp
@@ -0,0 +1,38 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <stdlib.h>
+
+#include "command.hpp"
+
+void zmq::deallocate_command (command_t *cmd_)
+{
+ switch (cmd_->type) {
+ case command_t::attach:
+ if (cmd_->args.attach.peer_identity)
+ free (cmd_->args.attach.peer_identity);
+ break;
+ case command_t::bind:
+ if (cmd_->args.bind.peer_identity)
+ free (cmd_->args.bind.peer_identity);
+ break;
+ default:
+ /* noop */;
+ }
+}
diff --git a/src/command.hpp b/src/command.hpp
index 469d6ec..150cad1 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -66,6 +66,8 @@ namespace zmq
// Attach the engine to the session.
struct {
struct i_engine *engine;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} attach;
// Sent from session to socket to establish pipe(s) between them.
@@ -73,6 +75,8 @@ namespace zmq
struct {
class reader_t *in_pipe;
class writer_t *out_pipe;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
@@ -107,6 +111,9 @@ namespace zmq
} args;
};
+ // Function to deallocate dynamically allocated components of the command.
+ void deallocate_command (command_t *cmd_);
+
}
#endif
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 8aafcf8..4233278 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t ()
while (!pipes.empty ())
delete *pipes.begin ();
+ // TODO: Deallocate any commands still in the pipes. Keep in mind that
+ // simple reading from a pipe and deallocating commands won't do as
+ // command pipe has template parameter D set to true, meaning that
+ // read may return false even if there are still commands in the pipe.
delete [] command_pipes;
#ifdef ZMQ_HAVE_WINDOWS
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 29b0689..3431264 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -26,7 +26,6 @@
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_DOWNSTREAM;
options.requires_in = false;
options.requires_out = true;
}
@@ -36,7 +35,7 @@ zmq::downstream_t::~downstream_t ()
}
void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_ && outpipe_);
lb.attach (outpipe_);
diff --git a/src/downstream.hpp b/src/downstream.hpp
index 35dec95..dbd79a5 100644
--- a/src/downstream.hpp
+++ b/src/downstream.hpp
@@ -34,7 +34,8 @@ namespace zmq
~downstream_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index d60b39e..ddab6a4 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+#include "blob.hpp"
+
namespace zmq
{
@@ -28,7 +30,7 @@ namespace zmq
virtual ~i_endpoint () {}
virtual void attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_) = 0;
+ class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0;
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index bcb4297..81b56df 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -22,6 +22,8 @@
#include <stddef.h>
+#include "blob.hpp"
+
namespace zmq
{
@@ -39,11 +41,11 @@ namespace zmq
// are messages to send available.
virtual void revive () = 0;
- // Start tracing the message route. Engine should add the identity
- // supplied to all inbound messages and trim identity from all the
- // outbound messages.
- virtual void traceroute (unsigned char *identity_,
- size_t identity_size_) = 0;
+ // Engine should add the prefix supplied to all inbound messages.
+ virtual void add_prefix (const blob_t &identity_) = 0;
+
+ // Engine should trim prefix from all the outbound messages.
+ virtual void trim_prefix () = 0;
};
}
diff --git a/src/object.cpp b/src/object.cpp
index a977f39..356fcd1 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "object.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
@@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::own:
process_own (cmd_.args.own.object);
- return;
+ break;
case command_t::attach:
- process_attach (cmd_.args.attach.engine);
+ process_attach (cmd_.args.attach.engine,
+ blob_t (cmd_.args.attach.peer_identity,
+ cmd_.args.attach.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::bind:
- process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
+ blob_t (cmd_.args.bind.peer_identity,
+ cmd_.args.bind.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::pipe_term:
process_pipe_term ();
@@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::pipe_term_ack:
process_pipe_term_ack ();
- return;
+ break;
case command_t::term_req:
process_term_req (cmd_.args.term_req.object);
- return;
+ break;
case command_t::term:
process_term ();
- return;
+ break;
case command_t::term_ack:
process_term_ack ();
- return;
+ break;
default:
zmq_assert (false);
}
+
+ // The assumption here is that each command is processed once only,
+ // so deallocating it after processing is all right.
+ deallocate_command (&cmd_);
}
void zmq::object_t::register_pipe (class pipe_t *pipe_)
@@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
- bool inc_seqnum_)
+ const blob_t &peer_identity_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
+ if (peer_identity_.empty ()) {
+ cmd.args.attach.peer_identity_size = 0;
+ cmd.args.attach.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.attach.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.attach.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.attach.peer_identity_size);
+ memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
void zmq::object_t::send_bind (socket_base_t *destination_,
- reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_)
+ reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_,
+ bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_,
cmd.type = command_t::bind;
cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_;
+ if (peer_identity_.empty ()) {
+ cmd.args.bind.peer_identity_size = 0;
+ cmd.args.bind.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.bind.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.bind.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.bind.peer_identity_size);
+ memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
@@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_)
zmq_assert (false);
}
-void zmq::object_t::process_attach (i_engine *engine_)
+void zmq::object_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
-void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index e6b2379..1544109 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_OBJECT_HPP_INCLUDED__
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -64,10 +65,11 @@ namespace zmq
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
- struct i_engine *engine_, bool inc_seqnum_ = true);
+ struct i_engine *engine_, const blob_t &peer_identity_,
+ bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
- bool inc_seqnum_ = true);
+ const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
@@ -81,9 +83,10 @@ namespace zmq
virtual void process_stop ();
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
- virtual void process_attach (struct i_engine *engine_);
+ virtual void process_attach (struct i_engine *engine_,
+ const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_,
- class writer_t *out_pipe_);
+ class writer_t *out_pipe_, const blob_t &peer_identity_);
virtual void process_revive ();
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
diff --git a/src/options.cpp b/src/options.cpp
index f9d93d6..f78d8de 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -23,7 +23,6 @@
#include "err.hpp"
zmq::options_t::options_t () :
- type (-1),
hwm (0),
lwm (0),
swap (0),
@@ -34,7 +33,9 @@ zmq::options_t::options_t () :
sndbuf (0),
rcvbuf (0),
requires_in (false),
- requires_out (false)
+ requires_out (false),
+ immediate_connect (true),
+ traceroute (false)
{
}
@@ -76,7 +77,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
case ZMQ_IDENTITY:
- identity.assign ((const char*) optval_, optvallen_);
+
+ // Empty identity is invalid as well as identity longer than
+ // 255 bytes. Identity starting with binary zero is invalid
+ // as these are used for auto-generated identities.
+ if (optvallen_ < 1 || optvallen_ > 255 ||
+ *((const unsigned char*) optval_) == 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ identity.assign ((const unsigned char*) optval_, optvallen_);
return 0;
case ZMQ_RATE:
diff --git a/src/options.hpp b/src/options.hpp
index dbe3701..6d9be4d 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -20,10 +20,9 @@
#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__
#define __ZMQ_OPTIONS_HPP_INCLUDED__
-#include <string>
-
#include "stddef.h"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -34,14 +33,11 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
- // Type of the associated socket. One of the constants defined in zmq.h
- int type;
-
int64_t hwm;
int64_t lwm;
int64_t swap;
uint64_t affinity;
- std::string identity;
+ blob_t identity;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
uint32_t rate;
@@ -59,6 +55,15 @@ namespace zmq
// provided by the specific socket type.
bool requires_in;
bool requires_out;
+
+ // If true, when connecting, pipes are created immediately without
+ // waiting for the connection to be established. That way the socket
+ // is not aware of the peer's identity, however, it is able to send
+ // messages straight away.
+ bool immediate_connect;
+
+ // If true, socket requires tracerouting the messages.
+ bool traceroute;
};
}
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 46bbd0b..ca7a8f5 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
outpipe (NULL),
alive (true)
{
- options.type = ZMQ_P2P;
options.requires_in = true;
options.requires_out = true;
}
@@ -43,7 +42,7 @@ zmq::p2p_t::~p2p_t ()
}
void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_;
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 2ff1047..bca0eab 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -33,7 +33,8 @@ namespace zmq
~p2p_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index a2ba9c6..e708229 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,8 +88,13 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
+void zmq::pgm_receiver_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index f03551f..3f0ef81 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,7 +54,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index fa7d7e0..27b4d0c 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -102,8 +102,13 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
-void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
+void zmq::pgm_sender_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 89357f5..951c417 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,7 +52,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 1eeb34f..462a3a9 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (options.identity.size () > 0) {
- // Create gsi from identity string.
- gsi_base = options.identity;
+ // Create gsi from identity.
+ // TODO: We assume that identity is standard C string here.
+ // What if it contains binary zeroes?
+ gsi_base.assign ((const char*) options.identity.data (),
+ options.identity.size ());
} else {
// Generate random gsi.
diff --git a/src/pub.cpp b/src/pub.cpp
index 9a2dcc6..5b9d48c 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -27,7 +27,6 @@
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_PUB;
options.requires_in = false;
options.requires_out = true;
}
@@ -40,7 +39,7 @@ zmq::pub_t::~pub_t ()
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
out_pipes.push_back (outpipe_);
diff --git a/src/pub.hpp b/src/pub.hpp
index 5b2f348..26142a4 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -34,7 +34,8 @@ namespace zmq
~pub_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/rep.cpp b/src/rep.cpp
index 1649e83..755d78e 100644
--- a/src/rep.cpp
+++ b/