summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/named_session.cpp21
-rw-r--r--src/session.cpp37
-rw-r--r--src/session.hpp9
-rw-r--r--src/socket_base.cpp12
-rw-r--r--src/socket_base.hpp7
-rw-r--r--src/zmq_init.cpp6
6 files changed, 37 insertions, 55 deletions
diff --git a/src/named_session.cpp b/src/named_session.cpp
index d219286..131ea6b 100644
--- a/src/named_session.cpp
+++ b/src/named_session.cpp
@@ -20,7 +20,6 @@
#include "named_session.hpp"
#include "socket_base.hpp"
-/*
zmq::named_session_t::named_session_t (class io_thread_t *io_thread_,
socket_base_t *socket_, const options_t &options_,
const blob_t &name_) :
@@ -46,31 +45,33 @@ zmq::named_session_t::~named_session_t ()
void zmq::named_session_t::detach ()
{
- // TODO:
- zmq_assert (false);
+ // Clean up the mess left over by the failed connection.
+ clean_pipes ();
+
+ // Do nothing. Wait till the connection comes up again.
}
void zmq::named_session_t::attached (const blob_t &peer_identity_)
{
- if (!peer_identity.empty ()) {
+ if (!name.empty ()) {
// If both IDs are temporary, no checking is needed.
// TODO: Old ID should be reused in this case...
- if (peer_identity.empty () || peer_identity [0] != 0 ||
+ if (name.empty () || name [0] != 0 ||
peer_identity_.empty () || peer_identity_ [0] != 0) {
// If we already know the peer name do nothing, just check whether
// it haven't changed.
- zmq_assert (peer_identity == peer_identity_);
+ zmq_assert (name == peer_identity_);
}
}
else if (!peer_identity_.empty ()) {
// Store the peer identity.
- peer_identity = peer_identity_;
+ name = peer_identity_;
// Register the session using the peer name.
- if (!register_session (peer_identity, this)) {
+ if (!register_session (name, this)) {
// TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the
@@ -82,6 +83,6 @@ void zmq::named_session_t::attached (const blob_t &peer_identity_)
void zmq::named_session_t::detached ()
{
- socket->unregister_session (peer_identity);
+ unregister_session (name);
}
-*/
+
diff --git a/src/session.cpp b/src/session.cpp
index ea264f1..fd6a7ee 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -163,31 +163,6 @@ void zmq::session_t::process_plug ()
{
}
-void zmq::session_t::process_unplug ()
-{
- // TODO: There may be a problem here. The called ensures that all the
- // commands on the fly have been delivered. However, given that the
- // session is unregistered from the global repository only at this point
- // there may be some commands being sent to the session right now.
-
- // Unregister the session from the socket.
-// if (!peer_identity.empty () && peer_identity [0] != 0)
-// unregister_session (peer_identity);
-// TODO: Should be done in named session.
-
- // Ask associated pipes to terminate.
- if (in_pipe)
- in_pipe->terminate ();
- if (out_pipe)
- out_pipe->terminate ();
-
- if (engine) {
- engine->unplug ();
- delete engine;
- engine = NULL;
- }
-}
-
void zmq::session_t::finalise ()
{
// If all conditions are met, proceed with termination:
@@ -221,7 +196,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
if (socket_reader || socket_writer)
- send_bind (socket, socket_reader, socket_writer, peer_identity);
+ send_bind (socket, socket_reader, socket_writer, peer_identity_);
// Plug in the engine.
zmq_assert (!engine);
@@ -252,6 +227,16 @@ void zmq::session_t::process_term ()
finalise ();
}
+bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
+{
+ return socket->register_session (name_, session_);
+}
+
+void zmq::session_t::unregister_session (const blob_t &name_)
+{
+ socket->unregister_session (name_);
+}
+
void zmq::session_t::attached (const blob_t &peer_identity_)
{
}
diff --git a/src/session.hpp b/src/session.hpp
index e009a90..38cf317 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -72,6 +72,10 @@ namespace zmq
virtual void attached (const blob_t &peer_identity_);
virtual void detached ();
+ // Allows derives session types to (un)register session names.
+ bool register_session (const blob_t &name_, class session_t *session_);
+ void unregister_session (const blob_t &name_);
+
~session_t ();
// Remove any half processed messages. Flush unflushed messages.
@@ -85,7 +89,6 @@ namespace zmq
// Handlers for incoming commands.
void process_plug ();
- void process_unplug ();
void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_);
void process_term ();
@@ -110,10 +113,6 @@ namespace zmq
// The protocol I/O engine connected to the session.
struct i_engine *engine;
- // Identity of the peer (say the component on the other side
- // of TCP connection).
- blob_t peer_identity;
-
// The socket the session belongs to.
class socket_base_t *socket;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 0103618..76dfc46 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -564,29 +564,29 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
-bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
+bool zmq::socket_base_t::register_session (const blob_t &name_,
session_t *session_)
{
sessions_sync.lock ();
bool registered = sessions.insert (
- std::make_pair (peer_identity_, session_)).second;
+ std::make_pair (name_, session_)).second;
sessions_sync.unlock ();
return registered;
}
-void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_)
+void zmq::socket_base_t::unregister_session (const blob_t &name_)
{
sessions_sync.lock ();
- sessions_t::iterator it = sessions.find (peer_identity_);
+ sessions_t::iterator it = sessions.find (name_);
zmq_assert (it != sessions.end ());
sessions.erase (it);
sessions_sync.unlock ();
}
-zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
+zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
{
sessions_sync.lock ();
- sessions_t::iterator it = sessions.find (peer_identity_);
+ sessions_t::iterator it = sessions.find (name_);
if (it == sessions.end ()) {
sessions_sync.unlock ();
return NULL;
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 1d8c4ff..ce40d3f 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -71,10 +71,9 @@ namespace zmq
bool has_out ();
// Registry of named sessions.
- bool register_session (const blob_t &peer_identity_,
- class session_t *session_);
- void unregister_session (const blob_t &peer_identity_);
- class session_t *find_session (const blob_t &peer_identity_);
+ bool register_session (const blob_t &name_, class session_t *session_);
+ void unregister_session (const blob_t &name_);
+ class session_t *find_session (const blob_t &name_);
// i_reader_events interface implementation.
void activated (class reader_t *pipe_);
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index 68007a4..5bf6070 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -180,10 +180,8 @@ void zmq::zmq_init_t::finalise_initialisation ()
}
// There's no such named session. We have to create one.
-// TODO:
-zmq_assert (false);
-// session = new (std::nothrow) named_session_t (io_thread, socket,
-// options, peer_identity);
+ session = new (std::nothrow) named_session_t (io_thread, socket,
+ options, peer_identity);
zmq_assert (session);
launch_sibling (session);
engine->unplug ();