diff options
| -rw-r--r-- | src/session.cpp | 114 | ||||
| -rw-r--r-- | src/session.hpp | 10 | ||||
| -rw-r--r-- | src/socket_base.cpp | 25 | ||||
| -rw-r--r-- | src/socket_base.hpp | 9 | ||||
| -rw-r--r-- | src/zmq_init.cpp | 4 | 
5 files changed, 93 insertions, 69 deletions
diff --git a/src/session.cpp b/src/session.cpp index c92fb0c..1fab3c2 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,      out_pipe (NULL),      engine (NULL),      options (options_) -{ -    type = unnamed; -     +{          //  It's possible to register the session at this point as it will be      //  searched for only on reconnect, i.e. no race condition (session found      //  before it is plugged into it's I/O thread) is possible. @@ -52,13 +50,20 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,      ordinal (0),      options (options_)  { + +if (!peer_identity_size_) + +    //  If peer identity is not supplied, leave it empty.      if (peer_identity_size_) { -        type = named;          peer_identity.assign ((char*) peer_identity_, peer_identity_size_); -    } -    else { -        type = transient; -        //  TODO: Generate unique name here. +        if (!owner->register_session (peer_identity_size_, peer_identity_, +              this)) { + +            //  TODO: There's already a session with the specified +            //  identity. We should presumably syslog it and drop the +            //  session. +            zmq_assert (false); +        }      }  } @@ -104,7 +109,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)      engine = NULL;      //  Terminate transient session. -    if (type == transient) +    if (!ordinal && peer_identity.empty ())          term ();  } @@ -120,7 +125,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()  uint64_t zmq::session_t::get_ordinal ()  { -    zmq_assert (type == unnamed);      zmq_assert (ordinal);      return ordinal;  } @@ -168,13 +172,62 @@ void zmq::session_t::revive (reader_t *pipe_)  void zmq::session_t::process_plug ()  { -    //  Register the session with the socket. +} + +void zmq::session_t::process_unplug () +{ +    //  Unregister the session from the socket. +    if (ordinal) +        owner->unregister_session (ordinal); +    else if (!peer_identity.empty ()) +        owner->unregister_session ((unsigned char) peer_identity.size (), +            (unsigned char*) peer_identity.data ()); + +    //  Ask associated pipes to terminate. +    if (in_pipe) { +        in_pipe->term (); +        in_pipe = NULL; +    } +    if (out_pipe) { +        out_pipe->term (); +        out_pipe = NULL; +    } + +    if (engine) { +        engine->unplug (); +        delete engine; +        engine = NULL; +    } +} + +void zmq::session_t::process_attach (i_engine *engine_, +    unsigned char peer_identity_size_, unsigned char *peer_identity_) +{      if (!peer_identity.empty ()) { -        bool ok = owner->register_session (peer_identity.c_str (), this); -        //  There's already a session with the specified identity. -        //  We should syslog it and drop the session. TODO -        zmq_assert (ok); +        //  If we already know the peer name do nothing, just check whether +        //  it haven't changed. +        zmq_assert (peer_identity.size () == peer_identity_size_); +        zmq_assert (memcmp (peer_identity.data (), peer_identity_, +            peer_identity_size_) == 0); +    } +    else if (peer_identity_size_) { + +        //  Remember the peer identity. +        peer_identity.assign ((char*) peer_identity_, peer_identity_size_); + +        //  If the session is not registered with the ordinal, let's register +        //  it using the peer name. +        if (!ordinal) { +            if (!owner->register_session (peer_identity_size_, peer_identity_, +                  this)) { + +                //  TODO: There's already a session with the specified +                //  identity. We should presumably syslog it and drop the +                //  session. +                zmq_assert (false); +            } +        }      }      //  If session is created by 'connect' function, it has the pipes set @@ -204,37 +257,8 @@ void zmq::session_t::process_plug ()          send_bind (owner, outbound ? &outbound->reader : NULL,              inbound ? &inbound->writer : NULL);      } -} - -void zmq::session_t::process_unplug () -{ -    //  Unregister the session from the socket. There's nothing to do here -    //  for transient sessions. -    if (type == unnamed) -        owner->unregister_session (ordinal); -    else if (type == named) -        owner->unregister_session (peer_identity.c_str ()); -    //  Ask associated pipes to terminate. -    if (in_pipe) { -        in_pipe->term (); -        in_pipe = NULL; -    } -    if (out_pipe) { -        out_pipe->term (); -        out_pipe = NULL; -    } - -    if (engine) { -        engine->unplug (); -        delete engine; -        engine = NULL; -    } -} - -void zmq::session_t::process_attach (i_engine *engine_, -    unsigned char peer_identity_size_, unsigned char *peer_identity_) -{ +    //  Plug in the engine.      zmq_assert (!engine);      zmq_assert (engine_);      engine = engine_; diff --git a/src/session.hpp b/src/session.hpp index 9f8d5a8..7607cfb 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -81,20 +81,12 @@ namespace zmq          struct i_engine *engine; -        enum { -            transient, -            named, -            unnamed -        } type; -          //  Session is identified by ordinal in the case when it was created          //  before connection to the peer was established and thus we are          //  unaware of peer's identity.          uint64_t ordinal; -        //  Identity of the peer. If the peer is anonymous, unique name is -        //  generated instead. Peer identity (or the generated name) is used -        //  register the session with socket-level repository of sessions. +        //  Identity of the peer.          std::string peer_identity;          //  Inherited socket options. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 720e8cd..4af69a0 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -267,7 +267,7 @@ int zmq::socket_base_t::connect (const char *addr_)                  return -1;              } -            send_attach (session, pgm_sender); +            send_attach (session, pgm_sender, 0, NULL);          }          else if (options.requires_in) { @@ -282,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_)                  return -1;              } -            send_attach (session, pgm_receiver); +            send_attach (session, pgm_receiver, 0, NULL);          }          else              zmq_assert (false); @@ -454,30 +454,33 @@ bool zmq::socket_base_t::has_out ()      return xhas_out ();  } -bool zmq::socket_base_t::register_session (const char *name_, -    session_t *session_) +bool zmq::socket_base_t::register_session (unsigned char peer_identity_size_, +    unsigned char *peer_identity_, session_t *session_)  {      sessions_sync.lock (); -    bool registered = -        named_sessions.insert (std::make_pair (name_, session_)).second; +    bool registered = named_sessions.insert (std::make_pair (std::string ( +            (char*) peer_identity_, peer_identity_size_), session_)).second;      sessions_sync.unlock ();      return registered;  } -void zmq::socket_base_t::unregister_session (const char *name_) +void zmq::socket_base_t::unregister_session (unsigned char peer_identity_size_, +    unsigned char *peer_identity_)  {      sessions_sync.lock (); -    named_sessions_t::iterator it = named_sessions.find (name_); +    named_sessions_t::iterator it = named_sessions.find (std::string ( +        (char*) peer_identity_, peer_identity_size_));      zmq_assert (it != named_sessions.end ());      named_sessions.erase (it);      sessions_sync.unlock ();  } -zmq::session_t *zmq::socket_base_t::find_session (const char *name_) +zmq::session_t *zmq::socket_base_t::find_session ( +    unsigned char peer_identity_size_, unsigned char *peer_identity_)  {      sessions_sync.lock (); - -    named_sessions_t::iterator it = named_sessions.find (name_); +    named_sessions_t::iterator it = named_sessions.find (std::string ( +        (char*) peer_identity_, peer_identity_size_));      if (it == named_sessions.end ()) {          sessions_sync.unlock ();          return NULL; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1ad9ed1..a2878ea 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -78,9 +78,12 @@ namespace zmq          //  There are two distinct types of sessions: those identified by name          //  and those identified by ordinal number. Thus two sets of session          //  management functions. -        bool register_session (const char *name_, class session_t *session_); -        void unregister_session (const char *name_); -        class session_t *find_session (const char *name_); +        bool register_session (unsigned char peer_identity_size_, +            unsigned char *peer_identity_, class session_t *session_); +        void unregister_session (unsigned char peer_identity_size_, +            unsigned char *peer_identity_); +        class session_t *find_session (unsigned char peer_identity_size_, +            unsigned char *peer_identity_);          uint64_t register_session (class session_t *session_);          void unregister_session (uint64_t ordinal_);          class session_t *find_session (uint64_t ordinal_); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index b853a64..f062ede 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -164,7 +164,9 @@ void zmq::zmq_init_t::finalise ()          //  If the peer has a unique name, find the associated session. If it          //  doesn't exist, create it.          else if (!peer_identity.empty ()) { -            session = owner->find_session (peer_identity.c_str ()); +            session = owner->find_session ( +                (unsigned char) peer_identity.size (), +                (unsigned char*) peer_identity.data ());              if (!session) {                  session = new (std::nothrow) session_t (                      choose_io_thread (options.affinity), owner, options,  | 
