diff options
Diffstat (limited to 'src/session.cpp')
-rw-r--r-- | src/session.cpp | 127 |
1 files changed, 70 insertions, 57 deletions
diff --git a/src/session.cpp b/src/session.cpp index 1aece4d..f86327e 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, out_pipe (NULL), engine (NULL), options (options_) -{ - type = unnamed; - +{ // It's possible to register the session at this point as it will be // searched for only on reconnect, i.e. no race condition (session found // before it is plugged into it's I/O thread) is possible. @@ -42,23 +40,24 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, } zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const char *name_) : + const options_t &options_, const blob_t &peer_identity_) : owned_t (parent_, owner_), in_pipe (NULL), active (true), out_pipe (NULL), engine (NULL), + ordinal (0), + peer_identity (peer_identity_), options (options_) { - if (name_) { - type = named; - name = name_; - ordinal = 0; - } - else { - type = transient; - // TODO: Generate unique name here. - ordinal = 0; + if (!peer_identity.empty ()) { + if (!owner->register_session (peer_identity, this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } } } @@ -104,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_) engine = NULL; // Terminate transient session. - if (type == transient) + if (!ordinal && peer_identity.empty ()) term (); } @@ -120,7 +119,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner () uint64_t zmq::session_t::get_ordinal () { - zmq_assert (type == unnamed); zmq_assert (ordinal); return ordinal; } @@ -168,52 +166,15 @@ void zmq::session_t::revive (reader_t *pipe_) void zmq::session_t::process_plug () { - // Register the session with the socket. - if (!name.empty ()) { - bool ok = owner->register_session (name.c_str (), this); - - // There's already a session with the specified identity. - // We should syslog it and drop the session. TODO - zmq_assert (ok); - } - - // If session is created by 'connect' function, it has the pipes set - // already. Otherwise, it's being created by the listener and the pipes - // are yet to be created. - if (!in_pipe && !out_pipe) { - - pipe_t *inbound = NULL; - pipe_t *outbound = NULL; - - if (options.requires_out) { - inbound = new (std::nothrow) pipe_t (this, owner, - options.hwm, options.lwm); - zmq_assert (inbound); - in_pipe = &inbound->reader; - in_pipe->set_endpoint (this); - } - - if (options.requires_in) { - outbound = new (std::nothrow) pipe_t (owner, this, - options.hwm, options.lwm); - zmq_assert (outbound); - out_pipe = &outbound->writer; - out_pipe->set_endpoint (this); - } - - send_bind (owner, outbound ? &outbound->reader : NULL, - inbound ? &inbound->writer : NULL); - } } void zmq::session_t::process_unplug () { - // Unregister the session from the socket. There's nothing to do here - // for transient sessions. - if (type == unnamed) + // Unregister the session from the socket. + if (ordinal) owner->unregister_session (ordinal); - else if (type == named) - owner->unregister_session (name.c_str ()); + else if (!peer_identity.empty ()) + owner->unregister_session (peer_identity); // Ask associated pipes to terminate. if (in_pipe) { @@ -232,8 +193,60 @@ void zmq::session_t::process_unplug () } } -void zmq::session_t::process_attach (i_engine *engine_) +void zmq::session_t::process_attach (i_engine *engine_, + const blob_t &peer_identity_) { + if (!peer_identity.empty ()) { + + // If we already know the peer name do nothing, just check whether + // it haven't changed. + zmq_assert (peer_identity == peer_identity_); + } + else if (!peer_identity_.empty ()) { + + // Store the peer identity. + peer_identity = peer_identity_; + + // If the session is not registered with the ordinal, let's register + // it using the peer name. + if (!ordinal) { + if (!owner->register_session (peer_identity, this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } + } + } + + // Check whether the required pipes already exist. If not so, we'll + // create them and bind them to the socket object. + reader_t *socket_reader = NULL; + writer_t *socket_writer = NULL; + + if (options.requires_in && !out_pipe) { + pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, + options.hwm, options.lwm); + zmq_assert (pipe); + out_pipe = &pipe->writer; + out_pipe->set_endpoint (this); + socket_reader = &pipe->reader; + } + + if (options.requires_out && !in_pipe) { + pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, + options.hwm, options.lwm); + zmq_assert (pipe); + in_pipe = &pipe->reader; + in_pipe->set_endpoint (this); + socket_writer = &pipe->writer; + } + + if (socket_reader || socket_writer) + send_bind (owner, socket_reader, socket_writer, peer_identity); + + // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); engine = engine_; |