summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-02-13 13:07:33 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-02-13 13:07:33 +0100
commitcdc2efe9b5f0d1f45065b1c32e5eabd7e9f78a12 (patch)
tree5d847b389419e06687683cd7fa5437b681ba0cc0 /src
parent923eacd28a725a6b32de588fe7a54dbe252d84aa (diff)
Multi-hop REQ/REP, part VII., identity-related algorithms rewritten
Diffstat (limited to 'src')
-rw-r--r--src/session.cpp114
-rw-r--r--src/session.hpp10
-rw-r--r--src/socket_base.cpp25
-rw-r--r--src/socket_base.hpp9
-rw-r--r--src/zmq_init.cpp4
5 files changed, 93 insertions, 69 deletions
diff --git a/src/session.cpp b/src/session.cpp
index c92fb0c..1fab3c2 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
out_pipe (NULL),
engine (NULL),
options (options_)
-{
- type = unnamed;
-
+{
// It's possible to register the session at this point as it will be
// searched for only on reconnect, i.e. no race condition (session found
// before it is plugged into it's I/O thread) is possible.
@@ -52,13 +50,20 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
ordinal (0),
options (options_)
{
+
+if (!peer_identity_size_)
+
+ // If peer identity is not supplied, leave it empty.
if (peer_identity_size_) {
- type = named;
peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
- }
- else {
- type = transient;
- // TODO: Generate unique name here.
+ if (!owner->register_session (peer_identity_size_, peer_identity_,
+ this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
}
}
@@ -104,7 +109,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)
engine = NULL;
// Terminate transient session.
- if (type == transient)
+ if (!ordinal && peer_identity.empty ())
term ();
}
@@ -120,7 +125,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()
uint64_t zmq::session_t::get_ordinal ()
{
- zmq_assert (type == unnamed);
zmq_assert (ordinal);
return ordinal;
}
@@ -168,13 +172,62 @@ void zmq::session_t::revive (reader_t *pipe_)
void zmq::session_t::process_plug ()
{
- // Register the session with the socket.
+}
+
+void zmq::session_t::process_unplug ()
+{
+ // Unregister the session from the socket.
+ if (ordinal)
+ owner->unregister_session (ordinal);
+ else if (!peer_identity.empty ())
+ owner->unregister_session ((unsigned char) peer_identity.size (),
+ (unsigned char*) peer_identity.data ());
+
+ // Ask associated pipes to terminate.
+ if (in_pipe) {
+ in_pipe->term ();
+ in_pipe = NULL;
+ }
+ if (out_pipe) {
+ out_pipe->term ();
+ out_pipe = NULL;
+ }
+
+ if (engine) {
+ engine->unplug ();
+ delete engine;
+ engine = NULL;
+ }
+}
+
+void zmq::session_t::process_attach (i_engine *engine_,
+ unsigned char peer_identity_size_, unsigned char *peer_identity_)
+{
if (!peer_identity.empty ()) {
- bool ok = owner->register_session (peer_identity.c_str (), this);
- // There's already a session with the specified identity.
- // We should syslog it and drop the session. TODO
- zmq_assert (ok);
+ // If we already know the peer name do nothing, just check whether
+ // it haven't changed.
+ zmq_assert (peer_identity.size () == peer_identity_size_);
+ zmq_assert (memcmp (peer_identity.data (), peer_identity_,
+ peer_identity_size_) == 0);
+ }
+ else if (peer_identity_size_) {
+
+ // Remember the peer identity.
+ peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
+
+ // If the session is not registered with the ordinal, let's register
+ // it using the peer name.
+ if (!ordinal) {
+ if (!owner->register_session (peer_identity_size_, peer_identity_,
+ this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
+ }
}
// If session is created by 'connect' function, it has the pipes set
@@ -204,37 +257,8 @@ void zmq::session_t::process_plug ()
send_bind (owner, outbound ? &outbound->reader : NULL,
inbound ? &inbound->writer : NULL);
}
-}
-
-void zmq::session_t::process_unplug ()
-{
- // Unregister the session from the socket. There's nothing to do here
- // for transient sessions.
- if (type == unnamed)
- owner->unregister_session (ordinal);
- else if (type == named)
- owner->unregister_session (peer_identity.c_str ());
- // Ask associated pipes to terminate.
- if (in_pipe) {
- in_pipe->term ();
- in_pipe = NULL;
- }
- if (out_pipe) {
- out_pipe->term ();
- out_pipe = NULL;
- }
-
- if (engine) {
- engine->unplug ();
- delete engine;
- engine = NULL;
- }
-}
-
-void zmq::session_t::process_attach (i_engine *engine_,
- unsigned char peer_identity_size_, unsigned char *peer_identity_)
-{
+ // Plug in the engine.
zmq_assert (!engine);
zmq_assert (engine_);
engine = engine_;
diff --git a/src/session.hpp b/src/session.hpp
index 9f8d5a8..7607cfb 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -81,20 +81,12 @@ namespace zmq
struct i_engine *engine;
- enum {
- transient,
- named,
- unnamed
- } type;
-
// Session is identified by ordinal in the case when it was created
// before connection to the peer was established and thus we are
// unaware of peer's identity.
uint64_t ordinal;
- // Identity of the peer. If the peer is anonymous, unique name is
- // generated instead. Peer identity (or the generated name) is used
- // register the session with socket-level repository of sessions.
+ // Identity of the peer.
std::string peer_identity;
// Inherited socket options.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 720e8cd..4af69a0 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -267,7 +267,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_sender);
+ send_attach (session, pgm_sender, 0, NULL);
}
else if (options.requires_in) {
@@ -282,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_receiver);
+ send_attach (session, pgm_receiver, 0, NULL);
}
else
zmq_assert (false);
@@ -454,30 +454,33 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
-bool zmq::socket_base_t::register_session (const char *name_,
- session_t *session_)
+bool zmq::socket_base_t::register_session (unsigned char peer_identity_size_,
+ unsigned char *peer_identity_, session_t *session_)
{
sessions_sync.lock ();
- bool registered =
- named_sessions.insert (std::make_pair (name_, session_)).second;
+ bool registered = named_sessions.insert (std::make_pair (std::string (
+ (char*) peer_identity_, peer_identity_size_), session_)).second;
sessions_sync.unlock ();
return registered;
}
-void zmq::socket_base_t::unregister_session (const char *name_)
+void zmq::socket_base_t::unregister_session (unsigned char peer_identity_size_,
+ unsigned char *peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (name_);
+ named_sessions_t::iterator it = named_sessions.find (std::string (
+ (char*) peer_identity_, peer_identity_size_));
zmq_assert (it != named_sessions.end ());
named_sessions.erase (it);
sessions_sync.unlock ();
}
-zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
+zmq::session_t *zmq::socket_base_t::find_session (
+ unsigned char peer_identity_size_, unsigned char *peer_identity_)
{
sessions_sync.lock ();
-
- named_sessions_t::iterator it = named_sessions.find (name_);
+ named_sessions_t::iterator it = named_sessions.find (std::string (
+ (char*) peer_identity_, peer_identity_size_));
if (it == named_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 1ad9ed1..a2878ea 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -78,9 +78,12 @@ namespace zmq
// There are two distinct types of sessions: those identified by name
// and those identified by ordinal number. Thus two sets of session
// management functions.
- bool register_session (const char *name_, class session_t *session_);
- void unregister_session (const char *name_);
- class session_t *find_session (const char *name_);
+ bool register_session (unsigned char peer_identity_size_,
+ unsigned char *peer_identity_, class session_t *session_);
+ void unregister_session (unsigned char peer_identity_size_,
+ unsigned char *peer_identity_);
+ class session_t *find_session (unsigned char peer_identity_size_,
+ unsigned char *peer_identity_);
uint64_t register_session (class session_t *session_);
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index b853a64..f062ede 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -164,7 +164,9 @@ void zmq::zmq_init_t::finalise ()
// If the peer has a unique name, find the associated session. If it
// doesn't exist, create it.
else if (!peer_identity.empty ()) {
- session = owner->find_session (peer_identity.c_str ());
+ session = owner->find_session (
+ (unsigned char) peer_identity.size (),
+ (unsigned char*) peer_identity.data ());
if (!session) {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options,