summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-02-13 13:07:33 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-02-13 13:07:33 +0100
commitcdc2efe9b5f0d1f45065b1c32e5eabd7e9f78a12 (patch)
tree5d847b389419e06687683cd7fa5437b681ba0cc0 /src/session.cpp
parent923eacd28a725a6b32de588fe7a54dbe252d84aa (diff)
Multi-hop REQ/REP, part VII., identity-related algorithms rewritten
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp114
1 files changed, 69 insertions, 45 deletions
diff --git a/src/session.cpp b/src/session.cpp
index c92fb0c..1fab3c2 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
out_pipe (NULL),
engine (NULL),
options (options_)
-{
- type = unnamed;
-
+{
// It's possible to register the session at this point as it will be
// searched for only on reconnect, i.e. no race condition (session found
// before it is plugged into it's I/O thread) is possible.
@@ -52,13 +50,20 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
ordinal (0),
options (options_)
{
+
+if (!peer_identity_size_)
+
+ // If peer identity is not supplied, leave it empty.
if (peer_identity_size_) {
- type = named;
peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
- }
- else {
- type = transient;
- // TODO: Generate unique name here.
+ if (!owner->register_session (peer_identity_size_, peer_identity_,
+ this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
}
}
@@ -104,7 +109,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)
engine = NULL;
// Terminate transient session.
- if (type == transient)
+ if (!ordinal && peer_identity.empty ())
term ();
}
@@ -120,7 +125,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()
uint64_t zmq::session_t::get_ordinal ()
{
- zmq_assert (type == unnamed);
zmq_assert (ordinal);
return ordinal;
}
@@ -168,13 +172,62 @@ void zmq::session_t::revive (reader_t *pipe_)
void zmq::session_t::process_plug ()
{
- // Register the session with the socket.
+}
+
+void zmq::session_t::process_unplug ()
+{
+ // Unregister the session from the socket.
+ if (ordinal)
+ owner->unregister_session (ordinal);
+ else if (!peer_identity.empty ())
+ owner->unregister_session ((unsigned char) peer_identity.size (),
+ (unsigned char*) peer_identity.data ());
+
+ // Ask associated pipes to terminate.
+ if (in_pipe) {
+ in_pipe->term ();
+ in_pipe = NULL;
+ }
+ if (out_pipe) {
+ out_pipe->term ();
+ out_pipe = NULL;
+ }
+
+ if (engine) {
+ engine->unplug ();
+ delete engine;
+ engine = NULL;
+ }
+}
+
+void zmq::session_t::process_attach (i_engine *engine_,
+ unsigned char peer_identity_size_, unsigned char *peer_identity_)
+{
if (!peer_identity.empty ()) {
- bool ok = owner->register_session (peer_identity.c_str (), this);
- // There's already a session with the specified identity.
- // We should syslog it and drop the session. TODO
- zmq_assert (ok);
+ // If we already know the peer name do nothing, just check whether
+ // it haven't changed.
+ zmq_assert (peer_identity.size () == peer_identity_size_);
+ zmq_assert (memcmp (peer_identity.data (), peer_identity_,
+ peer_identity_size_) == 0);
+ }
+ else if (peer_identity_size_) {
+
+ // Remember the peer identity.
+ peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
+
+ // If the session is not registered with the ordinal, let's register
+ // it using the peer name.
+ if (!ordinal) {
+ if (!owner->register_session (peer_identity_size_, peer_identity_,
+ this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
+ }
}
// If session is created by 'connect' function, it has the pipes set
@@ -204,37 +257,8 @@ void zmq::session_t::process_plug ()
send_bind (owner, outbound ? &outbound->reader : NULL,
inbound ? &inbound->writer : NULL);
}
-}
-
-void zmq::session_t::process_unplug ()
-{
- // Unregister the session from the socket. There's nothing to do here
- // for transient sessions.
- if (type == unnamed)
- owner->unregister_session (ordinal);
- else if (type == named)
- owner->unregister_session (peer_identity.c_str ());
- // Ask associated pipes to terminate.
- if (in_pipe) {
- in_pipe->term ();
- in_pipe = NULL;
- }
- if (out_pipe) {
- out_pipe->term ();
- out_pipe = NULL;
- }
-
- if (engine) {
- engine->unplug ();
- delete engine;
- engine = NULL;
- }
-}
-
-void zmq::session_t::process_attach (i_engine *engine_,
- unsigned char peer_identity_size_, unsigned char *peer_identity_)
-{
+ // Plug in the engine.
zmq_assert (!engine);
zmq_assert (engine_);
engine = engine_;