summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/blob.hpp33
-rw-r--r--src/command.cpp38
-rw-r--r--src/command.hpp7
-rw-r--r--src/dispatcher.cpp4
-rw-r--r--src/downstream.cpp1
-rw-r--r--src/i_engine.hpp5
-rw-r--r--src/object.cpp67
-rw-r--r--src/object.hpp11
-rw-r--r--src/options.cpp7
-rw-r--r--src/options.hpp17
-rw-r--r--src/p2p.cpp1
-rw-r--r--src/pgm_receiver.cpp3
-rw-r--r--src/pgm_receiver.hpp2
-rw-r--r--src/pgm_sender.cpp3
-rw-r--r--src/pgm_sender.hpp2
-rw-r--r--src/pgm_socket.cpp7
-rw-r--r--src/pub.cpp1
-rw-r--r--src/rep.cpp6
-rw-r--r--src/req.cpp1
-rw-r--r--src/session.cpp127
-rw-r--r--src/session.hpp26
-rw-r--r--src/socket_base.cpp79
-rw-r--r--src/socket_base.hpp14
-rw-r--r--src/sub.cpp1
-rw-r--r--src/upstream.cpp1
-rw-r--r--src/xrep.cpp9
-rw-r--r--src/xreq.cpp1
-rw-r--r--src/zmq_decoder.cpp34
-rw-r--r--src/zmq_decoder.hpp6
-rw-r--r--src/zmq_engine.cpp5
-rw-r--r--src/zmq_engine.hpp2
-rw-r--r--src/zmq_init.cpp17
-rw-r--r--src/zmq_init.hpp5
34 files changed, 349 insertions, 196 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 446b1e2..4146f68 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \
atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
+ blob.hpp \
command.hpp \
config.hpp \
decoder.hpp \
@@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_init.hpp \
zmq_listener.hpp \
app_thread.cpp \
+ command.cpp \
devpoll.cpp \
dispatcher.cpp \
downstream.cpp \
diff --git a/src/blob.hpp b/src/blob.hpp
new file mode 100644
index 0000000..a4fa8cd
--- /dev/null
+++ b/src/blob.hpp
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_BLOB_HPP_INCLUDED__
+#define __ZMQ_BLOB_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ // Object to hold dynamically allocated opaque binary data.
+ typedef std::basic_string <unsigned char> blob_t;
+
+}
+
+#endif
diff --git a/src/command.cpp b/src/command.cpp
new file mode 100644
index 0000000..8bf7ea2
--- /dev/null
+++ b/src/command.cpp
@@ -0,0 +1,38 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <stdlib.h>
+
+#include "command.hpp"
+
+void zmq::deallocate_command (command_t *cmd_)
+{
+ switch (cmd_->type) {
+ case command_t::attach:
+ if (cmd_->args.attach.peer_identity)
+ free (cmd_->args.attach.peer_identity);
+ break;
+ case command_t::bind:
+ if (cmd_->args.bind.peer_identity)
+ free (cmd_->args.bind.peer_identity);
+ break;
+ default:
+ /* noop */;
+ }
+}
diff --git a/src/command.hpp b/src/command.hpp
index 469d6ec..150cad1 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -66,6 +66,8 @@ namespace zmq
// Attach the engine to the session.
struct {
struct i_engine *engine;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} attach;
// Sent from session to socket to establish pipe(s) between them.
@@ -73,6 +75,8 @@ namespace zmq
struct {
class reader_t *in_pipe;
class writer_t *out_pipe;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
@@ -107,6 +111,9 @@ namespace zmq
} args;
};
+ // Function to deallocate dynamically allocated components of the command.
+ void deallocate_command (command_t *cmd_);
+
}
#endif
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 8aafcf8..4233278 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t ()
while (!pipes.empty ())
delete *pipes.begin ();
+ // TODO: Deallocate any commands still in the pipes. Keep in mind that
+ // simple reading from a pipe and deallocating commands won't do as
+ // command pipe has template parameter D set to true, meaning that
+ // read may return false even if there are still commands in the pipe.
delete [] command_pipes;
#ifdef ZMQ_HAVE_WINDOWS
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 29b0689..2da08e3 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -26,7 +26,6 @@
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_DOWNSTREAM;
options.requires_in = false;
options.requires_out = true;
}
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index bcb4297..d64027d 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -22,6 +22,8 @@
#include <stddef.h>
+#include "blob.hpp"
+
namespace zmq
{
@@ -42,8 +44,7 @@ namespace zmq
// Start tracing the message route. Engine should add the identity
// supplied to all inbound messages and trim identity from all the
// outbound messages.
- virtual void traceroute (unsigned char *identity_,
- size_t identity_size_) = 0;
+ virtual void traceroute (const blob_t &identity_) = 0;
};
}
diff --git a/src/object.cpp b/src/object.cpp
index a977f39..356fcd1 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "object.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
@@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::own:
process_own (cmd_.args.own.object);
- return;
+ break;
case command_t::attach:
- process_attach (cmd_.args.attach.engine);
+ process_attach (cmd_.args.attach.engine,
+ blob_t (cmd_.args.attach.peer_identity,
+ cmd_.args.attach.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::bind:
- process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
+ blob_t (cmd_.args.bind.peer_identity,
+ cmd_.args.bind.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::pipe_term:
process_pipe_term ();
@@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::pipe_term_ack:
process_pipe_term_ack ();
- return;
+ break;
case command_t::term_req:
process_term_req (cmd_.args.term_req.object);
- return;
+ break;
case command_t::term:
process_term ();
- return;
+ break;
case command_t::term_ack:
process_term_ack ();
- return;
+ break;
default:
zmq_assert (false);
}
+
+ // The assumption here is that each command is processed once only,
+ // so deallocating it after processing is all right.
+ deallocate_command (&cmd_);
}
void zmq::object_t::register_pipe (class pipe_t *pipe_)
@@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
- bool inc_seqnum_)
+ const blob_t &peer_identity_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
+ if (peer_identity_.empty ()) {
+ cmd.args.attach.peer_identity_size = 0;
+ cmd.args.attach.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.attach.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.attach.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.attach.peer_identity_size);
+ memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
void zmq::object_t::send_bind (socket_base_t *destination_,
- reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_)
+ reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_,
+ bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_,
cmd.type = command_t::bind;
cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_;
+ if (peer_identity_.empty ()) {
+ cmd.args.bind.peer_identity_size = 0;
+ cmd.args.bind.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.bind.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.bind.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.bind.peer_identity_size);
+ memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
@@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_)
zmq_assert (false);
}
-void zmq::object_t::process_attach (i_engine *engine_)
+void zmq::object_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
-void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index e6b2379..1544109 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_OBJECT_HPP_INCLUDED__
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -64,10 +65,11 @@ namespace zmq
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
- struct i_engine *engine_, bool inc_seqnum_ = true);
+ struct i_engine *engine_, const blob_t &peer_identity_,
+ bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
- bool inc_seqnum_ = true);
+ const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
@@ -81,9 +83,10 @@ namespace zmq
virtual void process_stop ();
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
- virtual void process_attach (struct i_engine *engine_);
+ virtual void process_attach (struct i_engine *engine_,
+ const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_,
- class writer_t *out_pipe_);
+ class writer_t *out_pipe_, const blob_t &peer_identity_);
virtual void process_revive ();
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
diff --git a/src/options.cpp b/src/options.cpp
index f9d93d6..b77af24 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -23,7 +23,6 @@
#include "err.hpp"
zmq::options_t::options_t () :
- type (-1),
hwm (0),
lwm (0),
swap (0),
@@ -34,7 +33,9 @@ zmq::options_t::options_t () :
sndbuf (0),
rcvbuf (0),
requires_in (false),
- requires_out (false)
+ requires_out (false),
+ immediate_connect (true),
+ traceroute (false)
{
}
@@ -76,7 +77,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
case ZMQ_IDENTITY:
- identity.assign ((const char*) optval_, optvallen_);
+ identity.assign ((const unsigned char*) optval_, optvallen_);
return 0;
case ZMQ_RATE:
diff --git a/src/options.hpp b/src/options.hpp
index dbe3701..6d9be4d 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -20,10 +20,9 @@
#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__
#define __ZMQ_OPTIONS_HPP_INCLUDED__
-#include <string>
-
#include "stddef.h"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -34,14 +33,11 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
- // Type of the associated socket. One of the constants defined in zmq.h
- int type;
-
int64_t hwm;
int64_t lwm;
int64_t swap;
uint64_t affinity;
- std::string identity;
+ blob_t identity;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
uint32_t rate;
@@ -59,6 +55,15 @@ namespace zmq
// provided by the specific socket type.
bool requires_in;
bool requires_out;
+
+ // If true, when connecting, pipes are created immediately without
+ // waiting for the connection to be established. That way the socket
+ // is not aware of the peer's identity, however, it is able to send
+ // messages straight away.
+ bool immediate_connect;
+
+ // If true, socket requires tracerouting the messages.
+ bool traceroute;
};
}
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 46bbd0b..72bc26b 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
outpipe (NULL),
alive (true)
{
- options.type = ZMQ_P2P;
options.requires_in = true;
options.requires_out = true;
}
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index a2ba9c6..b7ca327 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,8 +88,7 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_receiver_t::traceroute (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index f03551f..b01e5b0 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,7 +54,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index fa7d7e0..acbc3fb 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -102,8 +102,7 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
-void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_sender_t::traceroute (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 89357f5..7041610 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,7 +52,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 1eeb34f..462a3a9 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (options.identity.size () > 0) {
- // Create gsi from identity string.
- gsi_base = options.identity;
+ // Create gsi from identity.
+ // TODO: We assume that identity is standard C string here.
+ // What if it contains binary zeroes?
+ gsi_base.assign ((const char*) options.identity.data (),
+ options.identity.size ());
} else {
// Generate random gsi.
diff --git a/src/pub.cpp b/src/pub.cpp
index 9a2dcc6..05bfdcf 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -27,7 +27,6 @@
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_PUB;
options.requires_in = false;
options.requires_out = true;
}
diff --git a/src/rep.cpp b/src/rep.cpp
index b7685b4..8c5b86c 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -30,9 +30,13 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
waiting_for_reply (false),
reply_pipe (NULL)
{
- options.type = ZMQ_REP;
options.requires_in = true;
options.requires_out = true;
+
+ // We don't need immediate connect. We'll be able to send messages
+ // (replies) only when connection is established and thus requests
+ // can arrive anyway.
+ options.immediate_connect = false;
}
zmq::rep_t::~rep_t ()
diff --git a/src/req.cpp b/src/req.cpp
index 9b1766e..c9240e0 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -30,7 +30,6 @@ zmq::req_t::req_t (class app_thread_t *parent_) :
reply_pipe_active (false),
reply_pipe (NULL)
{
- options.type = ZMQ_REQ;
options.requires_in = true;
options.requires_out = true;
}
diff --git a/src/session.cpp b/src/session.cpp
index 1aece4d..f86327e 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
out_pipe (NULL),
engine (NULL),
options (options_)
-{
- type = unnamed;
-
+{
// It's possible to register the session at this point as it will be
// searched for only on reconnect, i.e. no race condition (session found
// before it is plugged into it's I/O thread) is possible.
@@ -42,23 +40,24 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
}
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const char *name_) :
+ const options_t &options_, const blob_t &peer_identity_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (true),
out_pipe (NULL),
engine (NULL),
+ ordinal (0),
+ peer_identity (peer_identity_),
options (options_)
{
- if (name_) {
- type = named;
- name = name_;
- ordinal = 0;
- }
- else {
- type = transient;
- // TODO: Generate unique name here.
- ordinal = 0;
+ if (!peer_identity.empty ()) {
+ if (!owner->register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
}
}
@@ -104,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)
engine = NULL;
// Terminate transient session.
- if (type == transient)
+ if (!ordinal && peer_identity.empty ())
term ();
}
@@ -120,7 +119,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()
uint64_t zmq::session_t::get_ordinal ()
{
- zmq_assert (type == unnamed);
zmq_assert (ordinal);
return ordinal;
}
@@ -168,52 +166,15 @@ void zmq::session_t::revive (reader_t *pipe_)
void zmq::session_t::process_plug ()
{
- // Register the session with the socket.
- if (!name.empty ()) {
- bool ok = owner->register_session (name.c_str (), this);
-
- // There's already a session with the specified identity.
- // We should syslog it and drop the session. TODO
- zmq_assert (ok);
- }
-
- // If session is created by 'connect' function, it has the pipes set
- // already. Otherwise, it's being created by the listener and the pipes
- // are yet to be created.
- if (!in_pipe && !out_pipe) {
-
- pipe_t *inbound = NULL;
- pipe_t *outbound = NULL;
-
- if (options.requires_out) {
- inbound = new (std::nothrow) pipe_t (this, owner,
- options.hwm, options.lwm);
- zmq_assert (inbound);
- in_pipe = &inbound->reader;
- in_pipe->set_endpoint (this);
- }
-
- if (options.requires_in) {
- outbound = new (std::nothrow) pipe_t (owner, this,
- options.hwm, options.lwm);
- zmq_assert (outbound);
- out_pipe = &outbound->writer;
- out_pipe->set_endpoint (this);
- }
-
- send_bind (owner, outbound ? &outbound->reader : NULL,
- inbound ? &inbound->writer : NULL);
- }
}
void zmq::session_t::process_unplug ()
{
- // Unregister the session from the socket. There's nothing to do here
- // for transient sessions.
- if (type == unnamed)
+ // Unregister the session from the socket.
+ if (ordinal)
owner->unregister_session (ordinal);
- else if (type == named)
- owner->unregister_session (name.c_str ());
+ else if (!peer_identity.empty ())
+ owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
if (in_pipe) {
@@ -232,8 +193,60 @@ void zmq::session_t::process_unplug ()
}
}
-void zmq::session_t::process_attach (i_engine *engine_)
+void zmq::session_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
+ if (!peer_identity.empty ()) {
+
+ // If we already know the peer name do nothing, just check whether
+ // it haven't changed.
+ zmq_assert (peer_identity == peer_identity_);
+ }
+ else if (!peer_identity_.empty ()) {
+
+ // Store the peer identity.
+ peer_identity = peer_identity_;
+
+ // If the session is not registered with the ordinal, let's register
+ // it using the peer name.
+ if (!ordinal) {
+ if (!owner->register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
+ }
+ }
+
+ // Check whether the required pipes already exist. If not so, we'll
+ // create them and bind them to the socket object.
+ reader_t *socket_reader = NULL;
+ writer_t *socket_writer = NULL;
+
+ if (options.requires_in && !out_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ out_pipe = &pipe->writer;
+ out_pipe->set_endpoint (this);
+ socket_reader = &pipe->reader;
+ }
+
+ if (options.requires_out && !in_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ in_pipe = &pipe->reader;
+ in_pipe->set_endpoint (this);
+ socket_writer = &pipe->writer;
+ }
+
+ if (socket_reader || socket_writer)
+ send_bind (owner, socket_reader, socket_writer, peer_identity);
+
+ // Plug in the engine.
zmq_assert (!engine);
zmq_assert (engine_);
engine = engine_;
diff --git a/src/session.hpp b/src/session.hpp
index 375d095..d412728 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -20,12 +20,11 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
-#include <string>
-
#include "i_inout.hpp"
#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -38,10 +37,9 @@ namespace zmq
session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_);
<