summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-23 19:37:56 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-23 19:37:56 +0100
commitaebff623f36efddc0de7a3192832b61802f8cec8 (patch)
treefd3c88417309994b72b5a33f152ba4b028930fa9
parentb3bd4c15fe869de4f5c530ecc5942968677a85c3 (diff)
ZMQII-28: Bidirectional introduction on TCP connection establishment
-rw-r--r--src/Makefile.am8
-rw-r--r--src/i_inout.hpp6
-rw-r--r--src/pgm_receiver.cpp3
-rw-r--r--src/pgm_receiver.hpp6
-rw-r--r--src/pgm_sender.cpp3
-rw-r--r--src/pgm_sender.hpp6
-rw-r--r--src/session.cpp54
-rw-r--r--src/session.hpp24
-rw-r--r--src/socket_base.cpp83
-rw-r--r--src/socket_base.hpp27
-rw-r--r--src/zmq_connecter.cpp15
-rw-r--r--src/zmq_connecter.hpp8
-rw-r--r--src/zmq_connecter_init.cpp132
-rw-r--r--src/zmq_connecter_init.hpp79
-rw-r--r--src/zmq_engine.cpp2
-rw-r--r--src/zmq_init.cpp195
-rw-r--r--src/zmq_init.hpp (renamed from src/zmq_listener_init.hpp)52
-rw-r--r--src/zmq_listener.cpp6
-rw-r--r--src/zmq_listener_init.cpp137
19 files changed, 387 insertions, 459 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 0fdaf37..0f47e7b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -115,12 +115,11 @@ libzmq_la_SOURCES = app_thread.hpp \
ypollset.hpp \
yqueue.hpp \
zmq_connecter.hpp \
- zmq_connecter_init.hpp \
zmq_decoder.hpp \
zmq_encoder.hpp \
zmq_engine.hpp \
+ zmq_init.hpp \
zmq_listener.hpp \
- zmq_listener_init.hpp \
app_thread.cpp \
devpoll.cpp \
dispatcher.cpp \
@@ -161,12 +160,11 @@ libzmq_la_SOURCES = app_thread.hpp \
ypollset.cpp \
zmq.cpp \
zmq_connecter.cpp \
- zmq_connecter_init.cpp \
zmq_decoder.cpp \
zmq_encoder.cpp \
zmq_engine.cpp \
- zmq_listener.cpp \
- zmq_listener_init.cpp
+ zmq_init.cpp \
+ zmq_listener.cpp
libzmq_la_LDFLAGS = -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAFS@
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
index b82a476..8a0ce6a 100644
--- a/src/i_inout.hpp
+++ b/src/i_inout.hpp
@@ -22,6 +22,8 @@
#include "../bindings/c/zmq.h"
+#include "stdint.hpp"
+
namespace zmq
{
@@ -47,8 +49,8 @@ namespace zmq
// Return pointer to the owning socket.
virtual class socket_base_t *get_owner () = 0;
- // Returns the name of associated session.
- virtual const char *get_session_name () = 0;
+ // Return ordinal number of the session.
+ virtual uint64_t get_ordinal () = 0;
};
}
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 2a24858..1d4d695 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -37,11 +37,10 @@
#include "i_inout.hpp"
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
- const options_t &options_, const char *session_name_) :
+ const options_t &options_) :
io_object_t (parent_),
pgm_socket (true, options_),
options (options_),
- session_name (session_name_),
inout (NULL)
{
}
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index fa84acb..91169f4 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -47,8 +47,7 @@ namespace zmq
// Creates gm_engine. Underlying PGM connection is initialised
// using network_ parameter.
- pgm_receiver_t (class io_thread_t *parent_, const options_t &options_,
- const char *session_name_);
+ pgm_receiver_t (class io_thread_t *parent_, const options_t &options_);
~pgm_receiver_t ();
int init (bool udp_encapsulation_, const char *network_);
@@ -94,9 +93,6 @@ namespace zmq
// Socket options.
options_t options;
- // Name of the session associated with the connecter.
- std::string session_name;
-
// Parent session.
i_inout *inout;
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 676ed93..880bb09 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -33,12 +33,11 @@
#include "wire.hpp"
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
- const options_t &options_, const char *session_name_) :
+ const options_t &options_) :
io_object_t (parent_),
encoder (0, false),
pgm_socket (false, options_),
options (options_),
- session_name (session_name_),
inout (NULL),
out_buffer (NULL),
out_buffer_size (0),
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 9a9844c..30b545d 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -42,8 +42,7 @@ namespace zmq
{
public:
- pgm_sender_t (class io_thread_t *parent_, const options_t &options_,
- const char *session_name_);
+ pgm_sender_t (class io_thread_t *parent_, const options_t &options_);
~pgm_sender_t ();
int init (bool udp_encapsulation_, const char *network_);
@@ -74,9 +73,6 @@ namespace zmq
// Socket options.
options_t options;
- // Name of the session associated with the connecter.
- std::string session_name;
-
// Poll handle associated with PGM socket.
handle_t handle;
handle_t uplink_handle;
diff --git a/src/session.cpp b/src/session.cpp
index 37f2720..a17e205 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -25,16 +25,41 @@
#include "pipe.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const char *name_, const options_t &options_, bool reconnect_) :
+ const options_t &options_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (true),
out_pipe (NULL),
engine (NULL),
- name (name_),
- options (options_),
- reconnect (reconnect_)
+ options (options_)
{
+ type = unnamed;
+
+ // It's possible to register the session at this point as it will be
+ // searched for only on reconnect, i.e. no race condition (session found
+ // before it is plugged into it's I/O thread) is possible.
+ ordinal = owner->register_session (this);
+}
+
+zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
+ const options_t &options_, const char *name_) :
+ owned_t (parent_, owner_),
+ in_pipe (NULL),
+ active (true),
+ out_pipe (NULL),
+ engine (NULL),
+ options (options_)
+{
+ if (name_) {
+ type = named;
+ name = name_;
+ ordinal = 0;
+ }
+ else {
+ type = transient;
+ // TODO: Generate unique name here.
+ ordinal = 0;
+ }
}
zmq::session_t::~session_t ()
@@ -78,8 +103,8 @@ void zmq::session_t::detach (owned_t *reconnecter_)
// Engine is terminating itself. No need to deallocate it from here.
engine = NULL;
- // In the case od anonymous connection, terminate the session.
- if (name.empty ())
+ // Terminate transient session.
+ if (type == transient)
term ();
}
@@ -93,9 +118,11 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()
return owner;
}
-const char *zmq::session_t::get_session_name ()
+uint64_t zmq::session_t::get_ordinal ()
{
- return name.c_str ();
+ zmq_assert (type == unnamed);
+ zmq_assert (ordinal);
+ return ordinal;
}
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
@@ -181,11 +208,12 @@ void zmq::session_t::process_plug ()
void zmq::session_t::process_unplug ()
{
- // Unregister the session from the socket.
- if (!name.empty ()) {
- bool ok = owner->unregister_session (name.c_str ());
- zmq_assert (ok);
- }
+ // Unregister the session from the socket. There's nothing to do here
+ // for transient sessions.
+ if (type == unnamed)
+ owner->unregister_session (ordinal);
+ else if (type == named)
+ owner->unregister_session (name.c_str ());
// Ask associated pipes to terminate.
if (in_pipe) {
diff --git a/src/session.hpp b/src/session.hpp
index 72e1d59..c60cfc7 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -34,8 +34,14 @@ namespace zmq
{
public:
- session_t (object_t *parent_, socket_base_t *owner_, const char *name_,
- const options_t &options_, bool reconnect_);
+ // Creates unnamed session.
+ session_t (object_t *parent_, socket_base_t *owner_,
+ const options_t &options_);
+
+ // Creates named session. If name is NULL, transient session with
+ // auto-generated name is created.
+ session_t (object_t *parent_, socket_base_t *owner_,
+ const options_t &options_, const char *name_);
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
@@ -44,7 +50,7 @@ namespace zmq
void detach (owned_t *reconnecter_);
class io_thread_t *get_io_thread ();
class socket_base_t *get_owner ();
- const char *get_session_name ();
+ uint64_t get_ordinal ();
// i_endpoint interface implementation.
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
@@ -73,6 +79,15 @@ namespace zmq
struct i_engine *engine;
+ enum {
+ transient,
+ named,
+ unnamed
+ } type;
+
+ // Ordinal of the session (if any).
+ uint64_t ordinal;
+
// The name of the session. One that is used to register it with
// socket-level repository of sessions.
std::string name;
@@ -80,9 +95,6 @@ namespace zmq
// Inherited socket options.
options_t options;
- // If true, reconnection is required after connection breaks.
- bool reconnect;
-
session_t (const session_t&);
void operator = (const session_t&);
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index fde258c..43209d5 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -32,7 +32,6 @@
#include "session.hpp"
#include "config.hpp"
#include "owned.hpp"
-#include "uuid.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "platform.hpp"
@@ -46,7 +45,8 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
app_thread (parent_),
shutting_down (false),
sent_seqnum (0),
- processed_seqnum (0)
+ processed_seqnum (0),
+ next_ordinal (1)
{
}
@@ -114,10 +114,6 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
{
- // Generate a unique name for the session.
- std::string session_name ("#");
- session_name += uuid_t ().to_string ();
-
// Parse addr_ string.
std::string addr_type;
std::string addr_args;
@@ -170,10 +166,10 @@ int zmq::socket_base_t::connect (const char *addr_)
return 0;
}
- // Create the session.
+ // Create unnamed session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
- session_t *session = new (std::nothrow) session_t (io_thread, this,
- session_name.c_str (), options, true);
+ session_t *session = new (std::nothrow) session_t (io_thread,
+ this, options);
zmq_assert (session);
pipe_t *in_pipe = NULL;
@@ -213,7 +209,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// it is established.
zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
choose_io_thread (options.affinity), this, options,
- session_name.c_str (), false);
+ session->get_ordinal (), false);
zmq_assert (connecter);
int rc = connecter->set_address (addr_args.c_str ());
if (rc != 0) {
@@ -245,8 +241,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
- choose_io_thread (options.affinity), options,
- session_name.c_str ());
+ choose_io_thread (options.affinity), options);
zmq_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
@@ -261,8 +256,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
- choose_io_thread (options.affinity), options,
- session_name.c_str ());
+ choose_io_thread (options.affinity), options);
zmq_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
@@ -408,7 +402,8 @@ int zmq::socket_base_t::close ()
// Check whether there are no session leaks.
sessions_sync.lock ();
- zmq_assert (sessions.empty ());
+ zmq_assert (named_sessions.empty ());
+ zmq_assert (unnamed_sessions.empty ());
sessions_sync.unlock ();
delete this;
@@ -445,36 +440,74 @@ bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_)
{
sessions_sync.lock ();
- bool registered = sessions.insert (std::make_pair (name_, session_)).second;
+ bool registered =
+ named_sessions.insert (std::make_pair (name_, session_)).second;
sessions_sync.unlock ();
return registered;
}
-bool zmq::socket_base_t::unregister_session (const char *name_)
+void zmq::socket_base_t::unregister_session (const char *name_)
{
sessions_sync.lock ();
- sessions_t::iterator it = sessions.find (name_);
- bool unregistered = (it != sessions.end ());
- sessions.erase (it);
+ named_sessions_t::iterator it = named_sessions.find (name_);
+ zmq_assert (it != named_sessions.end ());
+ named_sessions.erase (it);
sessions_sync.unlock ();
- return unregistered;
}
zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
{
sessions_sync.lock ();
- sessions_t::iterator it = sessions.find (name_);
- if (it == sessions.end ()) {
+ named_sessions_t::iterator it = named_sessions.find (name_);
+ if (it == named_sessions.end ()) {
+ sessions_sync.unlock ();
+ return NULL;
+ }
+ session_t *session = it->second;
+
+ // Prepare the session for subsequent attach command.
+ session->inc_seqnum ();
+
+ sessions_sync.unlock ();
+ return session;
+}
+
+uint64_t zmq::socket_base_t::register_session (session_t *session_)
+{
+ sessions_sync.lock ();
+ uint64_t ordinal = next_ordinal;
+ next_ordinal++;
+ unnamed_sessions.insert (std::make_pair (ordinal, session_)).second;
+ sessions_sync.unlock ();
+ return ordinal;
+}
+
+void zmq::socket_base_t::unregister_session (uint64_t ordinal_)
+{
+ sessions_sync.lock ();
+ unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_);
+ zmq_assert (it != unnamed_sessions.end ());
+ unnamed_sessions.erase (it);
+ sessions_sync.unlock ();
+}
+
+zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_)
+{
+ sessions_sync.lock ();
+
+ unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_);
+ if (it == unnamed_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
}
+ session_t *session = it->second;
// Prepare the session for subsequent attach command.
- it->second->inc_seqnum ();
+ session->inc_seqnum ();
sessions_sync.unlock ();
- return it->second;
+ return session;
}
void zmq::socket_base_t::kill (reader_t *pipe_)
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 79a8340..16553ea 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -34,6 +34,7 @@
#include "options.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
+#include "stdint.hpp"
namespace zmq
{
@@ -74,9 +75,15 @@ namespace zmq
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
// thread to 0MQ. Locking is used instead.
+ // There are two distinct types of sessions: those identified by name
+ // and those identified by ordinal number. Thus two sets of session
+ // management functions.
bool register_session (const char *name_, class session_t *session_);
- bool unregister_session (const char *name_);
+ void unregister_session (const char *name_);
class session_t *find_session (const char *name_);
+ uint64_t register_session (class session_t *session_);
+ void unregister_session (uint64_t ordinal_);
+ class session_t *find_session (uint64_t ordinal_);
// i_endpoint interface implementation.
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
@@ -144,15 +151,15 @@ namespace zmq
// Sequence number of the last command processed by this object.
uint64_t processed_seqnum;
- // List of existing sessions. This list is never referenced from within
- // the socket, instead it is used by I/O objects owned by the session.
- // As those objects can live in different threads, the access is
- // synchronised using 'sessions_sync' mutex.
- // Local sessions are those named by the local instance of 0MQ.
- // Remote sessions are the sessions who's identities are provided by
- // the remote party.
- typedef std::map <std::string, session_t*> sessions_t;
- sessions_t sessions;
+ // Lists of existing sessions. This lists are never referenced from
+ // within the socket, instead they are used by I/O objects owned by
+ // the socket. As those objects can live in different threads,
+ // the access is synchronised by mutex.
+ typedef std::map <std::string, session_t*> named_sessions_t;
+ named_sessions_t named_sessions;
+ typedef std::map <uint64_t, session_t*> unnamed_sessions_t;
+ unnamed_sessions_t unnamed_sessions;
+ uint64_t next_ordinal;
mutex_t sessions_sync;
socket_base_t (const socket_base_t&);
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 5bda48d..8f95fc0 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -20,19 +20,20 @@
#include <new>
#include "zmq_connecter.hpp"
-#include "zmq_connecter_init.hpp"
+#include "zmq_engine.hpp"
+#include "zmq_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
socket_base_t *owner_, const options_t &options_,
- const char *session_name_, bool wait_) :
+ uint64_t session_ordinal_, bool wait_) :
owned_t (parent_, owner_),
io_object_t (parent_),
handle_valid (false),
wait (wait_),
- options (options_),
- session_name (session_name_)
+ session_ordinal (session_ordinal_),
+ options (options_)
{
}
@@ -88,9 +89,9 @@ void zmq::zmq_connecter_t::out_event ()
}
// Create an init object.
- io_thread_t *io_thread = choose_io_thread (options.affinity);
- zmq_connecter_init_t *init = new (std::nothrow) zmq_connecter_init_t (
- io_thread, owner, fd, options, session_name.c_str (), address.c_str ());
+ zmq_init_t *init = new (std::nothrow) zmq_init_t (
+ choose_io_thread (options.affinity), owner,
+ fd, options, true, address.c_str (), session_ordinal);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index acd3352..e5b4a70 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -36,7 +36,7 @@ namespace zmq
public:
zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_,
- const options_t &options_, const char *session_name_, bool wait_);
+ const options_t &options_, uint64_t session_ordinal_, bool wait_);
~zmq_connecter_t ();
// Set IP address to connect to.
@@ -69,12 +69,12 @@ namespace zmq
// If true, connecter is waiting a while before trying to connect.
bool wait;
+ // Ordinal of the session to attach to.
+ uint64_t session_ordinal;
+
// Associated socket options.
options_t options;
- // Name of the session associated with the connecter.
- std::string session_name;
-
// Address to connect to.
std::string address;
diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp
deleted file mode 100644
index f8436a3..0000000
--- a/src/zmq_connecter_init.cpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <new>
-
-#include "zmq_connecter_init.hpp"
-#include "zmq_connecter.hpp"
-#include "io_thread.hpp"
-#include "session.hpp"
-#include "err.hpp"
-
-zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
- socket_base_t *owner_, fd_t fd_, const options_t &options_,
- const char *session_name_, const char *address_) :
- owned_t (parent_, owner_),
- options (options_),
- session_name (session_name_)
-{
- // Create associated engine object.
- engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, true,
- address_);
- zmq_assert (engine);
-}
-
-zmq::zmq_connecter_init_t::~zmq_connecter_init_t ()
-{
- if (engine)
- delete engine;
-}
-
-bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_)
-{
- // Send identity.
- int rc = zmq_msg_init_size (msg_, options.identity.size ());
- zmq_assert (rc == 0);
- memcpy (zmq_msg_data (msg_), options.identity.c_str (),
- options.identity.size ());
-
- // Initialisation is done at this point. Disconnect the engine from
- // the init object.
- engine->unplug ();
-
- // Find the session associated with this connecter. If it doesn't exist
- // drop the newly created connection. If it does, attach it to the
- // connection.
- session_t *session = NULL;
- if (!session_name.empty ())
- session = owner->find_session (session_name.c_str ());
- if (!session) {
-
- // TODO:
- // The socket is already closing. The session is already shut down,
- // so no point in continuing with connecting. Shut the connection down.
- zmq_assert (false);
- }
-
- // No need to increment seqnum as it was alredy incremented above.
- send_attach (session, engine, false);
- engine = NULL;
-
- // Destroy the init object.
- term ();
-
- return true;
-}
-
-bool zmq::zmq_connecter_init_t::write (::zmq_msg_t *msg_)
-{
- return false;
-}
-
-void zmq::zmq_connecter_init_t::flush ()
-{
- // We are not expecting any messages. No point in flushing.
-}
-
-void zmq::zmq_connecter_init_t::detach (owned_t *reconnecter_)
-{
- // Plug in the reconnecter object.
- zmq_assert (reconnecter_);
- send_plug (reconnecter_);
- send_own (owner, reconnecter_);
-
- // This function is called by engine when disconnection occurs.
- // The engine will destroy itself, so we just drop the pointer here and
- // start termination of the init object.
- engine = NULL;
- term ();
-}
-
-zmq::io_thread_t *zmq::zmq_connecter_init_t::get_io_thread ()
-{
- return choose_io_thread (options.affinity);
-}
-
-class zmq::socket_base_t *zmq::zmq_connecter_init_t::get_owner ()
-{
- return owner;
-}
-
-const char *zmq::zmq_connecter_init_t::get_session_name ()
-{
- return session_name.c_str ();
-}
-
-void zmq::zmq_connecter_init_t::process_plug ()
-{
- zmq_assert (engine);
- engine->plug (this);
-}
-
-void zmq::zmq_connecter_init_t::process_unplug ()
-{
- if (engine)
- engine->unplug ();
-}
diff --git a/src/zmq_connecter_init.hpp b/src/zmq_connecter_init.hpp
deleted file mode 100644
index 03ccd24..0000000
--- a/src/zmq_connecter_init.hpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__
-#define __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__
-
-#include <string>
-
-#include "i_inout.hpp"
-#include "owned.hpp"
-#include "zmq_engine.hpp"
-#include "stdint.hpp"
-#include "fd.hpp"
-#include "options.hpp"
-
-namespace zmq
-{
-
- // The class handles initialisation phase of native 0MQ wire-level
- // protocol on the connecting side of the connection.
-
- class zmq_connecter_init_t : public owned_t, public i_inout
- {
- public:
-
- zmq_connecter_init_t (class io_thread_t *parent_, socket_base_t *owner_,
- fd_t fd_, const options_t &options, const char *session_name_,
- const char *address_);
- ~zmq_connecter_init_t ();
-
- private:
-
- // i_inout interface implementation.
- bool read (::zmq_msg_t *msg_);
- bool write (::zmq_msg_t *msg_);
- void flush ();
- void detach (owned_t *reconnecter_);
- class io_thread_t *get_io_thread ();
- class socket_base_t *get_owner ();
- const char *get_session_name ();
-
- // Handlers for incoming commands.
- void process_plug ();
- void process_unplug ();
-
- // Engine is created by zmq_connecter_init_t object. Once the
- // initialisation phase is over it is passed to a session object,
- // possibly running in a different I/O thread.
- zmq_engine_t *engine;
-
- // Associated socket options.
- options_t options;
-
- // Name of the session to bind new connection to.
- std::string session_name;
-
- zmq_connecter_init_t (const zmq_connecter_init_t&);
- void operator = (const zmq_connecter_init_t&);
- };
-
-}
-
-#endif
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index c2878a7..cfc87a7 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -159,7 +159,7 @@ void zmq::zmq_engine_t::error ()
// Ask it to wait for a while before reconnecting.
reconnecter = new (std::nothrow) zmq_connecter_t (
inout->get_io_thread (), inout->get_owner (),
- options, inout->get_session_name (), true);
+ options, inout->get_ordinal (), true);
zmq_assert (reconnecter);
reconnecter->set_address (address.c_str ());
}
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
new file mode 100644
index 0000000..e526b34
--- /dev/null
+++ b/src/zmq_init.cpp
@@ -0,0 +1,195 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "zmq_init.hpp"
+#include "zmq_engine.hpp"
+#include "io_thread.hpp"
+#include "session.hpp"
+#include "err.hpp"
+
+zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
+ fd_t fd_, const options_t &options_, bool reconnect_,
+ const char *address_, uint64_t session_ordinal_) :
+ owned_t (parent_, owner_),
+ sent (false),
+ received (false),
+ session_ordinal (session_ordinal_),
+ options (options_)
+{
+ // Create the engine object for this connection.
+ engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options,
+ reconnect_, address_);
+ zmq_assert (engine);
+}
+
+zmq::zmq_init_t::~zmq_init_t ()
+{
+ if (engine)
+ delete engine;
+}
+
+bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
+{
+ // If the identity was already sent, do nothing.
+ if (sent)
+ return false;
+
+ // Send the identity.
+ int rc = zmq_msg_init_size (msg_, options.identity.size ());
+ zmq_assert (rc == 0);
+ memcpy (zmq_msg_data (msg_), options.identity.c_str (),
+ options.identity.size ());
+ sent = true;
+
+ // If initialisation is done, pass the engine to the session and
+ // destroy the init object.
+ finalise ();
+
+ return true;
+}
+
+bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
+{
+ // If identity was already received, we are not interested
+ // in subsequent messages.
+ if (received)
+ return false;
+
+ // Retreieve the remote identity.
+ peer_identity.assign ((const char*) zmq_msg_data (msg_),
+ zmq_msg_size (msg_));
+ received = true;
+
+ return true;
+}
+
+void zmq::zmq_init_t::flush ()
+{
+ // Check if there's anything to flush.
+ if (!received)
+ return;
+
+ // If initialisation is done, pass the engine to the session and
+ // destroy the init object.
+ finalise ();
+}
+
+void zmq::zmq_init_t::detach (owned_t *reconnecter_)
+{
+ // This function is called by engine when disconnection occurs.
+
+ // If required, launch the reconnecter.
+ if (reconnecter_) {
+ send_plug (reconnecter_);
+ send_own (owner, reconnecter_);
+ }
+
+ // The engine will destroy itself, so let's just drop the pointer here and
+ // start termination of the init object.
+ engine = NULL;
+ term ();
+}
+
+zmq::io_thread_t *zmq::zmq_init_t::get_io_thread ()
+{
+ return choose_io_thread (options.affinity);
+}
+
+class zmq::socket_base_t *zmq::zmq_init_t::get_owner ()
+{
+ return owner;
+}
+
+uint64_t zmq::zmq_init_t::get_ordinal ()
+{
+ zmq_assert (false);
+}
+
+void zmq::zmq_init_t::process_plug ()
+{
+ zmq_assert (engine);
+ engine->plug (this);
+}
+
+void zmq::zmq_init_t::process_unplug ()
+{
+ if (engine)
+ engine->unplug ();
+}
+
+void zmq::zmq_init_t::finalise ()
+{
+ if (sent && received) {
+
+ // Disconnect the engine from the init object.
+ engine->unplug ();
+
+ session_t *session = NULL;
+
+ // If we have the session ordinal, let's use it to find the session.
+ // If it is not found, it means socket is already being shut down
+ // and the session have been deallocated.
+ // TODO: We should check whether the name of the peer haven't changed
+ // upon reconnection.
+ if (session_ordinal) {
+ session = owner->find_session (session_ordinal);
+ if (!session) {
+ term ();
+ return;
+ }
+ }
+
+ // If the peer has a unique name, find the associated session. If it
+ // doesn't exist, create it.
+ else if (!peer_identity.empty ()) {
+ session = owner->find_session (peer_identity.c_str ());
+ if (!session) {
+ session = new (std::nothrow) session_t (
+ choose_io_thread (options.affinity), owner, options,
+ peer_identity.c_str ());
+ zmq_assert (session);
+ send_plug (session);
+ send_own (owner, session);
+
+ // Reserve a sequence number for following 'attach' command.
+ session->inc_seqnum ();
+ }
+ }
+
+ // If the other party has no specific identity, let's create a
+ // transient session.
+ else {
+ session = new (std::nothrow) session_t (
+ choose_io_thread (options.affinity), owner, options, NULL);
+ zmq_assert (session);
+ send_plug (session);
+ send_own (owner, session);
+
+ // Reserve a sequence number for following 'attach' command.
+ session->inc_seqnum ();
+ }
+
+ // No need to increment seqnum as it was laready incremented above.
+ send_attach (session, engine, false);
+
+ // Destroy the init object.
+ engine = NULL;
+ term ();
+ }
+}
diff --git a/src/zmq_listener_init.hpp b/src/zmq_init.hpp
index d7fde02..a17d621 100644
--- a/src/zmq_listener_init.hpp
+++ b/src/zmq_init.hpp
@@ -17,34 +17,37 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__
-#define __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__
+#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
+#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#include <string>
#include "i_inout.hpp"
+#include "i_engine.hpp"
#include "owned.hpp"
-#include "zmq_engine.hpp"
-#include "stdint.hpp"
#include "fd.hpp"
+#include "stdint.hpp"
#include "options.hpp"
+#include "stdint.hpp"
namespace zmq
{
- // The class handles initialisation phase of native 0MQ wire-level
- // protocol on the listening side of the connection.
+ // The class handles initialisation phase of 0MQ wire-level protocol.
- class zmq_listener_init_t : public owned_t, public i_inout
+ class zmq_init_t : public owned_t, public i_inout
{
public:
- zmq_listener_init_t (class io_thread_t *parent_, socket_base_t *owner_,
- fd_t fd_, const options_t &options);
- ~zmq_listener_init_t ();
+ zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_,
+ fd_t fd_, const options_t &options_, bool reconnect_,
+ const char *address_, uint64_t session_ordinal_);
+ ~zmq_init_t ();
private:
+ void finalise ();
+
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
@@ -52,26 +55,33 @@ namespace zmq
void detach (owned_t *reconnecter_);
class io_thread_t *get_io_thread ();
class socket_base_t *get_owner ();
- const char *get_session_name ();
+ uint64_t get_ordinal ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
- // Engine is created by zmq_listener_init_t object. Once the
- // initialisation phase is over it is passed to a session object,
- // possibly running in a different I/O thread.
- zmq_engine_t *engine;
+ // Associated wite-protocol engine.
+ i_engine *engine;
- // Associated socket options.
- options_t options;
+ // True if our own identity was already sent to the peer.
+ bool sent;
- // Indetity on the other end of the connection.
- bool has_peer_identity;
+ // True if peer's identity was already received.
+ bool received;
+
+ // Identity of the peer socket.
std::string peer_identity;
- zmq_listener_init_t (const zmq_listener_init_t&);
- void operator = (const zmq_listener_init_t&);
+ // TCP connecter creates session before the name of the peer is known.
+ // Thus we know only its ordinal number.
+ uint64_t session_ordinal;
+
+ // Associated socket options.
+ options_t options;
+
+ zmq_init_t (const zmq_init_t&);
+ void operator = (const zmq_init_t&);
};
}
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 5c7552b..6a7e2fd 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -20,7 +20,7 @@
#include <new>
#include "zmq_listener.hpp"
-#include "zmq_listener_init.hpp"
+#include "zmq_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
@@ -64,8 +64,8 @@ void zmq::zmq_listener_t::in_event ()
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
- zmq_listener_init_t *init = new (std::nothrow) zmq_listener_init_t (
- io_thread, owner, fd, options);
+ zmq_init_t *init = new (std::nothrow) zmq_init_t (
+ io_thread, owner, fd, options, false, NULL, 0);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
deleted file mode 100644
index f7b3001..0000000
--- a/src/zmq_listener_init.cpp
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <new>
-
-#include "zmq_listener_init.hpp"
-#include "io_thread.hpp"
-#include "session.hpp"
-#include "err.hpp"
-
-zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
- socket_base_t *owner_, fd_t fd_, const options_t &options_) :
- owned_t (parent_, owner_),
- options (options_),
- has_peer_identity (false)
-{
- // Create associated engine object.
- engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options,
- false, NULL);
- zmq_assert (engine);
-}
-
-zmq::zmq_listener_init_t::~zmq_listener_init_t ()
-{
- if (engine)
- delete engine;
-}
-
-bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_)
-{
- return false;
-}
-
-bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
-{
- // Once we've got peer's identity we aren't interested in subsequent
- // messages.
- if (has_peer_identity)
- return false;
-
- // Retreieve the remote identity. We'll use it as a local session name.
- has_peer_identity = true;
- peer_identity.assign ((const char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
-
- return true;
-}
-
-void zmq::zmq_listener_init_t::flush ()
-{
- if (!has_peer_identity)
- return;
-
- // Initialisation is done. Disconnect the engine from the init object.
- engine->unplug ();
-
- // Have a look whether the session already exists. If it does, attach it
- // to the engine. If it doesn't create it first.
- session_t *session = NULL;
- if (!peer_identity.empty ())
- session = owner->find_session (peer_identity.c_str ());
- if (!session) {
- io_thread_t *io_thread = choose_io_thread (options.affinity);
- session = new (std::nothrow) session_t (io_thread, owner,
- peer_identity.c_str (), options, false);
- zmq_assert (session);
- send_plug (session);
- send_own (owner, session);
-
- // Reserve a sequence number for following 'attach' command.
- session->inc_seqnum ();
- }
-
- // No need to increment seqnum as it was laready incremented above.
- send_attach (session, engine, false);
-
- engine = NULL;
-
- // Destroy the init object.
- term ();
-}
-
-void zmq::zmq_listener_init_t::detach (owned_t *reconnecter_)
-{
- // On the listening side of the connection we are never reconnecting.
- zmq_assert (reconnecter_ == NULL);
-
- // This function is called by engine when disconnection occurs.
- // The engine will destroy itself, so we just drop the pointer here and
- // start termination of the init object.
- engine = NULL;
- term ();
-}
-
-zmq::io_thread_t *zmq::zmq_listener_init_t::get_io_thread ()
-{
- return choose_io_thread (options.affinity);
-}
-
-class zmq::socket_base_t *zmq::zmq_listener_init_t::get_owner ()
-{
- return owner;
-}
-
-const char *zmq::zmq_listener_init_t::get_session_name ()
-{
- zmq_assert (false);
- return NULL;
-}
-
-void zmq::zmq_listener_init_t::process_plug ()
-{
- zmq_assert (engine);
- engine->plug (this);
-}
-
-void zmq::zmq_listener_init_t::process_unplug ()
-{
- if (engine)
- engine->unplug ();
-}