diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/session.cpp | 114 | ||||
-rw-r--r-- | src/session.hpp | 10 | ||||
-rw-r--r-- | src/socket_base.cpp | 25 | ||||
-rw-r--r-- | src/socket_base.hpp | 9 | ||||
-rw-r--r-- | src/zmq_init.cpp | 4 |
5 files changed, 93 insertions, 69 deletions
diff --git a/src/session.cpp b/src/session.cpp index c92fb0c..1fab3c2 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, out_pipe (NULL), engine (NULL), options (options_) -{ - type = unnamed; - +{ // It's possible to register the session at this point as it will be // searched for only on reconnect, i.e. no race condition (session found // before it is plugged into it's I/O thread) is possible. @@ -52,13 +50,20 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, ordinal (0), options (options_) { + +if (!peer_identity_size_) + + // If peer identity is not supplied, leave it empty. if (peer_identity_size_) { - type = named; peer_identity.assign ((char*) peer_identity_, peer_identity_size_); - } - else { - type = transient; - // TODO: Generate unique name here. + if (!owner->register_session (peer_identity_size_, peer_identity_, + this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } } } @@ -104,7 +109,7 @@ void zmq::session_t::detach (owned_t *reconnecter_) engine = NULL; // Terminate transient session. - if (type == transient) + if (!ordinal && peer_identity.empty ()) term (); } @@ -120,7 +125,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner () uint64_t zmq::session_t::get_ordinal () { - zmq_assert (type == unnamed); zmq_assert (ordinal); return ordinal; } @@ -168,13 +172,62 @@ void zmq::session_t::revive (reader_t *pipe_) void zmq::session_t::process_plug () { - // Register the session with the socket. +} + +void zmq::session_t::process_unplug () +{ + // Unregister the session from the socket. + if (ordinal) + owner->unregister_session (ordinal); + else if (!peer_identity.empty ()) + owner->unregister_session ((unsigned char) peer_identity.size (), + (unsigned char*) peer_identity.data ()); + + // Ask associated pipes to terminate. + if (in_pipe) { + in_pipe->term (); + in_pipe = NULL; + } + if (out_pipe) { + out_pipe->term (); + out_pipe = NULL; + } + + if (engine) { + engine->unplug (); + delete engine; + engine = NULL; + } +} + +void zmq::session_t::process_attach (i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_) +{ if (!peer_identity.empty ()) { - bool ok = owner->register_session (peer_identity.c_str (), this); - // There's already a session with the specified identity. - // We should syslog it and drop the session. TODO - zmq_assert (ok); + // If we already know the peer name do nothing, just check whether + // it haven't changed. + zmq_assert (peer_identity.size () == peer_identity_size_); + zmq_assert (memcmp (peer_identity.data (), peer_identity_, + peer_identity_size_) == 0); + } + else if (peer_identity_size_) { + + // Remember the peer identity. + peer_identity.assign ((char*) peer_identity_, peer_identity_size_); + + // If the session is not registered with the ordinal, let's register + // it using the peer name. + if (!ordinal) { + if (!owner->register_session (peer_identity_size_, peer_identity_, + this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } + } } // If session is created by 'connect' function, it has the pipes set @@ -204,37 +257,8 @@ void zmq::session_t::process_plug () send_bind (owner, outbound ? &outbound->reader : NULL, inbound ? &inbound->writer : NULL); } -} - -void zmq::session_t::process_unplug () -{ - // Unregister the session from the socket. There's nothing to do here - // for transient sessions. - if (type == unnamed) - owner->unregister_session (ordinal); - else if (type == named) - owner->unregister_session (peer_identity.c_str ()); - // Ask associated pipes to terminate. - if (in_pipe) { - in_pipe->term (); - in_pipe = NULL; - } - if (out_pipe) { - out_pipe->term (); - out_pipe = NULL; - } - - if (engine) { - engine->unplug (); - delete engine; - engine = NULL; - } -} - -void zmq::session_t::process_attach (i_engine *engine_, - unsigned char peer_identity_size_, unsigned char *peer_identity_) -{ + // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); engine = engine_; diff --git a/src/session.hpp b/src/session.hpp index 9f8d5a8..7607cfb 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -81,20 +81,12 @@ namespace zmq struct i_engine *engine; - enum { - transient, - named, - unnamed - } type; - // Session is identified by ordinal in the case when it was created // before connection to the peer was established and thus we are // unaware of peer's identity. uint64_t ordinal; - // Identity of the peer. If the peer is anonymous, unique name is - // generated instead. Peer identity (or the generated name) is used - // register the session with socket-level repository of sessions. + // Identity of the peer. std::string peer_identity; // Inherited socket options. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 720e8cd..4af69a0 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -267,7 +267,7 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - send_attach (session, pgm_sender); + send_attach (session, pgm_sender, 0, NULL); } else if (options.requires_in) { @@ -282,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - send_attach (session, pgm_receiver); + send_attach (session, pgm_receiver, 0, NULL); } else zmq_assert (false); @@ -454,30 +454,33 @@ bool zmq::socket_base_t::has_out () return xhas_out (); } -bool zmq::socket_base_t::register_session (const char *name_, - session_t *session_) +bool zmq::socket_base_t::register_session (unsigned char peer_identity_size_, + unsigned char *peer_identity_, session_t *session_) { sessions_sync.lock (); - bool registered = - named_sessions.insert (std::make_pair (name_, session_)).second; + bool registered = named_sessions.insert (std::make_pair (std::string ( + (char*) peer_identity_, peer_identity_size_), session_)).second; sessions_sync.unlock (); return registered; } -void zmq::socket_base_t::unregister_session (const char *name_) +void zmq::socket_base_t::unregister_session (unsigned char peer_identity_size_, + unsigned char *peer_identity_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (name_); + named_sessions_t::iterator it = named_sessions.find (std::string ( + (char*) peer_identity_, peer_identity_size_)); zmq_assert (it != named_sessions.end ()); named_sessions.erase (it); sessions_sync.unlock (); } -zmq::session_t *zmq::socket_base_t::find_session (const char *name_) +zmq::session_t *zmq::socket_base_t::find_session ( + unsigned char peer_identity_size_, unsigned char *peer_identity_) { sessions_sync.lock (); - - named_sessions_t::iterator it = named_sessions.find (name_); + named_sessions_t::iterator it = named_sessions.find (std::string ( + (char*) peer_identity_, peer_identity_size_)); if (it == named_sessions.end ()) { sessions_sync.unlock (); return NULL; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1ad9ed1..a2878ea 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -78,9 +78,12 @@ namespace zmq // There are two distinct types of sessions: those identified by name // and those identified by ordinal number. Thus two sets of session // management functions. - bool register_session (const char *name_, class session_t *session_); - void unregister_session (const char *name_); - class session_t *find_session (const char *name_); + bool register_session (unsigned char peer_identity_size_, + unsigned char *peer_identity_, class session_t *session_); + void unregister_session (unsigned char peer_identity_size_, + unsigned char *peer_identity_); + class session_t *find_session (unsigned char peer_identity_size_, + unsigned char *peer_identity_); uint64_t register_session (class session_t *session_); void unregister_session (uint64_t ordinal_); class session_t *find_session (uint64_t ordinal_); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index b853a64..f062ede 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -164,7 +164,9 @@ void zmq::zmq_init_t::finalise () // If the peer has a unique name, find the associated session. If it // doesn't exist, create it. else if (!peer_identity.empty ()) { - session = owner->find_session (peer_identity.c_str ()); + session = owner->find_session ( + (unsigned char) peer_identity.size (), + (unsigned char*) peer_identity.data ()); if (!session) { session = new (std::nothrow) session_t ( choose_io_thread (options.affinity), owner, options, |