summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp134
1 files changed, 76 insertions, 58 deletions
diff --git a/src/session.cpp b/src/session.cpp
index 1aece4d..05f319c 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
out_pipe (NULL),
engine (NULL),
options (options_)
-{
- type = unnamed;
-
+{
// It's possible to register the session at this point as it will be
// searched for only on reconnect, i.e. no race condition (session found
// before it is plugged into it's I/O thread) is possible.
@@ -42,23 +40,24 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
}
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const char *name_) :
+ const options_t &options_, const blob_t &peer_identity_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (true),
out_pipe (NULL),
engine (NULL),
+ ordinal (0),
+ peer_identity (peer_identity_),
options (options_)
{
- if (name_) {
- type = named;
- name = name_;
- ordinal = 0;
- }
- else {
- type = transient;
- // TODO: Generate unique name here.
- ordinal = 0;
+ if (!peer_identity.empty () && peer_identity [0] != 0) {
+ if (!owner->register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
}
}
@@ -104,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)
engine = NULL;
// Terminate transient session.
- if (type == transient)
+ if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
term ();
}
@@ -120,13 +119,12 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()
uint64_t zmq::session_t::get_ordinal ()
{
- zmq_assert (type == unnamed);
zmq_assert (ordinal);
return ordinal;
}
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
if (inpipe_) {
zmq_assert (!in_pipe);
@@ -168,52 +166,15 @@ void zmq::session_t::revive (reader_t *pipe_)
void zmq::session_t::process_plug ()
{
- // Register the session with the socket.
- if (!name.empty ()) {
- bool ok = owner->register_session (name.c_str (), this);
-
- // There's already a session with the specified identity.
- // We should syslog it and drop the session. TODO
- zmq_assert (ok);
- }
-
- // If session is created by 'connect' function, it has the pipes set
- // already. Otherwise, it's being created by the listener and the pipes
- // are yet to be created.
- if (!in_pipe && !out_pipe) {
-
- pipe_t *inbound = NULL;
- pipe_t *outbound = NULL;
-
- if (options.requires_out) {
- inbound = new (std::nothrow) pipe_t (this, owner,
- options.hwm, options.lwm);
- zmq_assert (inbound);
- in_pipe = &inbound->reader;
- in_pipe->set_endpoint (this);
- }
-
- if (options.requires_in) {
- outbound = new (std::nothrow) pipe_t (owner, this,
- options.hwm, options.lwm);
- zmq_assert (outbound);
- out_pipe = &outbound->writer;
- out_pipe->set_endpoint (this);
- }
-
- send_bind (owner, outbound ? &outbound->reader : NULL,
- inbound ? &inbound->writer : NULL);
- }
}
void zmq::session_t::process_unplug ()
{
- // Unregister the session from the socket. There's nothing to do here
- // for transient sessions.
- if (type == unnamed)
+ // Unregister the session from the socket.
+ if (ordinal)
owner->unregister_session (ordinal);
- else if (type == named)
- owner->unregister_session (name.c_str ());
+ else if (!peer_identity.empty () && peer_identity [0] != 0)
+ owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
if (in_pipe) {
@@ -232,10 +193,67 @@ void zmq::session_t::process_unplug ()
}
}
-void zmq::session_t::process_attach (i_engine *engine_)
+void zmq::session_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
+ if (!peer_identity.empty ()) {
+
+ // If we already know the peer name do nothing, just check whether
+ // it haven't changed.
+ zmq_assert (peer_identity == peer_identity_);
+ }
+ else if (!peer_identity_.empty ()) {
+
+ // Store the peer identity.
+ peer_identity = peer_identity_;
+
+ // If the session is not registered with the ordinal, let's register
+ // it using the peer name.
+ if (!ordinal) {
+ if (!owner->register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
+ }
+ }
+
+ // Check whether the required pipes already exist. If not so, we'll
+ // create them and bind them to the socket object.
+ reader_t *socket_reader = NULL;
+ writer_t *socket_writer = NULL;
+
+ if (options.requires_in && !out_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ out_pipe = &pipe->writer;
+ out_pipe->set_endpoint (this);
+ socket_reader = &pipe->reader;
+ }
+
+ if (options.requires_out && !in_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ in_pipe = &pipe->reader;
+ in_pipe->set_endpoint (this);
+ socket_writer = &pipe->writer;
+ }
+
+ if (socket_reader || socket_writer)
+ send_bind (owner, socket_reader, socket_writer, peer_identity);
+
+ // Plug in the engine.
zmq_assert (!engine);
zmq_assert (engine_);
engine = engine_;
engine->plug (this);
+
+ // Once the initial handshaking is over tracerouting should trim prefixes
+ // from outbound messages.
+ if (options.traceroute)
+ engine->trim_prefix ();
}