summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp83
1 files changed, 58 insertions, 25 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index fde258c..43209d5 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -32,7 +32,6 @@
#include "session.hpp"
#include "config.hpp"
#include "owned.hpp"
-#include "uuid.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "platform.hpp"
@@ -46,7 +45,8 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
app_thread (parent_),
shutting_down (false),
sent_seqnum (0),
- processed_seqnum (0)
+ processed_seqnum (0),
+ next_ordinal (1)
{
}
@@ -114,10 +114,6 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
{
- // Generate a unique name for the session.
- std::string session_name ("#");
- session_name += uuid_t ().to_string ();
-
// Parse addr_ string.
std::string addr_type;
std::string addr_args;
@@ -170,10 +166,10 @@ int zmq::socket_base_t::connect (const char *addr_)
return 0;
}
- // Create the session.
+ // Create unnamed session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
- session_t *session = new (std::nothrow) session_t (io_thread, this,
- session_name.c_str (), options, true);
+ session_t *session = new (std::nothrow) session_t (io_thread,
+ this, options);
zmq_assert (session);
pipe_t *in_pipe = NULL;
@@ -213,7 +209,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// it is established.
zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
choose_io_thread (options.affinity), this, options,
- session_name.c_str (), false);
+ session->get_ordinal (), false);
zmq_assert (connecter);
int rc = connecter->set_address (addr_args.c_str ());
if (rc != 0) {
@@ -245,8 +241,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
- choose_io_thread (options.affinity), options,
- session_name.c_str ());
+ choose_io_thread (options.affinity), options);
zmq_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
@@ -261,8 +256,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
- choose_io_thread (options.affinity), options,
- session_name.c_str ());
+ choose_io_thread (options.affinity), options);
zmq_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
@@ -408,7 +402,8 @@ int zmq::socket_base_t::close ()
// Check whether there are no session leaks.
sessions_sync.lock ();
- zmq_assert (sessions.empty ());
+ zmq_assert (named_sessions.empty ());
+ zmq_assert (unnamed_sessions.empty ());
sessions_sync.unlock ();
delete this;
@@ -445,36 +440,74 @@ bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_)
{
sessions_sync.lock ();
- bool registered = sessions.insert (std::make_pair (name_, session_)).second;
+ bool registered =
+ named_sessions.insert (std::make_pair (name_, session_)).second;
sessions_sync.unlock ();
return registered;
}
-bool zmq::socket_base_t::unregister_session (const char *name_)
+void zmq::socket_base_t::unregister_session (const char *name_)
{
sessions_sync.lock ();
- sessions_t::iterator it = sessions.find (name_);
- bool unregistered = (it != sessions.end ());
- sessions.erase (it);
+ named_sessions_t::iterator it = named_sessions.find (name_);
+ zmq_assert (it != named_sessions.end ());
+ named_sessions.erase (it);
sessions_sync.unlock ();
- return unregistered;
}
zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
{
sessions_sync.lock ();
- sessions_t::iterator it = sessions.find (name_);
- if (it == sessions.end ()) {
+ named_sessions_t::iterator it = named_sessions.find (name_);
+ if (it == named_sessions.end ()) {
+ sessions_sync.unlock ();
+ return NULL;
+ }
+ session_t *session = it->second;
+
+ // Prepare the session for subsequent attach command.
+ session->inc_seqnum ();
+
+ sessions_sync.unlock ();
+ return session;
+}
+
+uint64_t zmq::socket_base_t::register_session (session_t *session_)
+{
+ sessions_sync.lock ();
+ uint64_t ordinal = next_ordinal;
+ next_ordinal++;
+ unnamed_sessions.insert (std::make_pair (ordinal, session_)).second;
+ sessions_sync.unlock ();
+ return ordinal;
+}
+
+void zmq::socket_base_t::unregister_session (uint64_t ordinal_)
+{
+ sessions_sync.lock ();
+ unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_);
+ zmq_assert (it != unnamed_sessions.end ());
+ unnamed_sessions.erase (it);
+ sessions_sync.unlock ();
+}
+
+zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_)
+{
+ sessions_sync.lock ();
+
+ unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_);
+ if (it == unnamed_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
}
+ session_t *session = it->second;
// Prepare the session for subsequent attach command.
- it->second->inc_seqnum ();
+ session->inc_seqnum ();
sessions_sync.unlock ();
- return it->second;
+ return session;
}
void zmq::socket_base_t::kill (reader_t *pipe_)