From 9b8993efb48ccbe3df917338cc078129b5af495b Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 24 Aug 2010 11:19:22 +0200 Subject: elementary fixes to the named session --- src/named_session.cpp | 21 +++++++++++---------- src/session.cpp | 37 +++++++++++-------------------------- src/session.hpp | 9 ++++----- src/socket_base.cpp | 12 ++++++------ src/socket_base.hpp | 7 +++---- src/zmq_init.cpp | 6 ++---- 6 files changed, 37 insertions(+), 55 deletions(-) (limited to 'src') diff --git a/src/named_session.cpp b/src/named_session.cpp index d219286..131ea6b 100644 --- a/src/named_session.cpp +++ b/src/named_session.cpp @@ -20,7 +20,6 @@ #include "named_session.hpp" #include "socket_base.hpp" -/* zmq::named_session_t::named_session_t (class io_thread_t *io_thread_, socket_base_t *socket_, const options_t &options_, const blob_t &name_) : @@ -46,31 +45,33 @@ zmq::named_session_t::~named_session_t () void zmq::named_session_t::detach () { - // TODO: - zmq_assert (false); + // Clean up the mess left over by the failed connection. + clean_pipes (); + + // Do nothing. Wait till the connection comes up again. } void zmq::named_session_t::attached (const blob_t &peer_identity_) { - if (!peer_identity.empty ()) { + if (!name.empty ()) { // If both IDs are temporary, no checking is needed. // TODO: Old ID should be reused in this case... - if (peer_identity.empty () || peer_identity [0] != 0 || + if (name.empty () || name [0] != 0 || peer_identity_.empty () || peer_identity_ [0] != 0) { // If we already know the peer name do nothing, just check whether // it haven't changed. - zmq_assert (peer_identity == peer_identity_); + zmq_assert (name == peer_identity_); } } else if (!peer_identity_.empty ()) { // Store the peer identity. - peer_identity = peer_identity_; + name = peer_identity_; // Register the session using the peer name. - if (!register_session (peer_identity, this)) { + if (!register_session (name, this)) { // TODO: There's already a session with the specified // identity. We should presumably syslog it and drop the @@ -82,6 +83,6 @@ void zmq::named_session_t::attached (const blob_t &peer_identity_) void zmq::named_session_t::detached () { - socket->unregister_session (peer_identity); + unregister_session (name); } -*/ + diff --git a/src/session.cpp b/src/session.cpp index ea264f1..fd6a7ee 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -163,31 +163,6 @@ void zmq::session_t::process_plug () { } -void zmq::session_t::process_unplug () -{ - // TODO: There may be a problem here. The called ensures that all the - // commands on the fly have been delivered. However, given that the - // session is unregistered from the global repository only at this point - // there may be some commands being sent to the session right now. - - // Unregister the session from the socket. -// if (!peer_identity.empty () && peer_identity [0] != 0) -// unregister_session (peer_identity); -// TODO: Should be done in named session. - - // Ask associated pipes to terminate. - if (in_pipe) - in_pipe->terminate (); - if (out_pipe) - out_pipe->terminate (); - - if (engine) { - engine->unplug (); - delete engine; - engine = NULL; - } -} - void zmq::session_t::finalise () { // If all conditions are met, proceed with termination: @@ -221,7 +196,7 @@ void zmq::session_t::process_attach (i_engine *engine_, } if (socket_reader || socket_writer) - send_bind (socket, socket_reader, socket_writer, peer_identity); + send_bind (socket, socket_reader, socket_writer, peer_identity_); // Plug in the engine. zmq_assert (!engine); @@ -252,6 +227,16 @@ void zmq::session_t::process_term () finalise (); } +bool zmq::session_t::register_session (const blob_t &name_, session_t *session_) +{ + return socket->register_session (name_, session_); +} + +void zmq::session_t::unregister_session (const blob_t &name_) +{ + socket->unregister_session (name_); +} + void zmq::session_t::attached (const blob_t &peer_identity_) { } diff --git a/src/session.hpp b/src/session.hpp index e009a90..38cf317 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -72,6 +72,10 @@ namespace zmq virtual void attached (const blob_t &peer_identity_); virtual void detached (); + // Allows derives session types to (un)register session names. + bool register_session (const blob_t &name_, class session_t *session_); + void unregister_session (const blob_t &name_); + ~session_t (); // Remove any half processed messages. Flush unflushed messages. @@ -85,7 +89,6 @@ namespace zmq // Handlers for incoming commands. void process_plug (); - void process_unplug (); void process_attach (struct i_engine *engine_, const blob_t &peer_identity_); void process_term (); @@ -110,10 +113,6 @@ namespace zmq // The protocol I/O engine connected to the session. struct i_engine *engine; - // Identity of the peer (say the component on the other side - // of TCP connection). - blob_t peer_identity; - // The socket the session belongs to. class socket_base_t *socket; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 0103618..76dfc46 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -564,29 +564,29 @@ bool zmq::socket_base_t::has_out () return xhas_out (); } -bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, +bool zmq::socket_base_t::register_session (const blob_t &name_, session_t *session_) { sessions_sync.lock (); bool registered = sessions.insert ( - std::make_pair (peer_identity_, session_)).second; + std::make_pair (name_, session_)).second; sessions_sync.unlock (); return registered; } -void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_) +void zmq::socket_base_t::unregister_session (const blob_t &name_) { sessions_sync.lock (); - sessions_t::iterator it = sessions.find (peer_identity_); + sessions_t::iterator it = sessions.find (name_); zmq_assert (it != sessions.end ()); sessions.erase (it); sessions_sync.unlock (); } -zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) +zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) { sessions_sync.lock (); - sessions_t::iterator it = sessions.find (peer_identity_); + sessions_t::iterator it = sessions.find (name_); if (it == sessions.end ()) { sessions_sync.unlock (); return NULL; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1d8c4ff..ce40d3f 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -71,10 +71,9 @@ namespace zmq bool has_out (); // Registry of named sessions. - bool register_session (const blob_t &peer_identity_, - class session_t *session_); - void unregister_session (const blob_t &peer_identity_); - class session_t *find_session (const blob_t &peer_identity_); + bool register_session (const blob_t &name_, class session_t *session_); + void unregister_session (const blob_t &name_); + class session_t *find_session (const blob_t &name_); // i_reader_events interface implementation. void activated (class reader_t *pipe_); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 68007a4..5bf6070 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -180,10 +180,8 @@ void zmq::zmq_init_t::finalise_initialisation () } // There's no such named session. We have to create one. -// TODO: -zmq_assert (false); -// session = new (std::nothrow) named_session_t (io_thread, socket, -// options, peer_identity); + session = new (std::nothrow) named_session_t (io_thread, socket, + options, peer_identity); zmq_assert (session); launch_sibling (session); engine->unplug (); -- cgit v1.2.3