From 6be4b0143793ab5ceebc5d9d6bbe5c2f1333a0d2 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <sustrik@fastmq.commkdir>
Date: Fri, 21 Aug 2009 14:29:22 +0200
Subject: session management implemented

---
 include/zmq.h              |  28 ++++++------
 include/zmq.hpp            |  11 +++--
 src/Makefile.am            |  10 ++--
 src/i_inout.hpp            |   4 +-
 src/msg.hpp                |  49 --------------------
 src/msg_content.hpp        |  50 ++++++++++++++++++++
 src/object.cpp             |   5 +-
 src/object.hpp             |   4 ++
 src/owned.cpp              |  10 +++-
 src/owned.hpp              |   4 ++
 src/pipe.hpp               |   2 +-
 src/session.cpp            |  39 ++++++++++++----
 src/session.hpp            |  15 ++++--
 src/socket_base.cpp        |  77 +++++++++++++++++++++----------
 src/socket_base.hpp        |  24 ++++++----
 src/zmq.cpp                | 111 +++++++++++++++++++++++---------------------
 src/zmq_connecter.cpp      |  11 +++--
 src/zmq_connecter.hpp      |  10 ++--
 src/zmq_connecter_init.cpp |  94 +++++++++++++++++++++++++++++++++++++
 src/zmq_connecter_init.hpp |  75 ++++++++++++++++++++++++++++++
 src/zmq_decoder.hpp        |   2 +-
 src/zmq_encoder.hpp        |   2 +-
 src/zmq_init.cpp           | 112 ---------------------------------------------
 src/zmq_init.hpp           |  82 ---------------------------------
 src/zmq_listener.cpp       |   5 +-
 src/zmq_listener_init.cpp  |  96 ++++++++++++++++++++++++++++++++++++++
 src/zmq_listener_init.hpp  |  71 ++++++++++++++++++++++++++++
 27 files changed, 621 insertions(+), 382 deletions(-)
 delete mode 100644 src/msg.hpp
 create mode 100644 src/msg_content.hpp
 create mode 100644 src/zmq_connecter_init.cpp
 create mode 100644 src/zmq_connecter_init.hpp
 delete mode 100644 src/zmq_init.cpp
 delete mode 100644 src/zmq_init.hpp
 create mode 100644 src/zmq_listener_init.cpp
 create mode 100644 src/zmq_listener_init.hpp

diff --git a/include/zmq.h b/include/zmq.h
index 46d779b..34ce80c 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -94,50 +94,50 @@ typedef void (zmq_free_fn) (void *data);
 //  is shared, i.e. reference counting is used to manage its lifetime
 //  rather than straighforward malloc/free. struct zmq_msg_content is
 //  not declared in the API.
-struct zmq_msg
+struct zmq_msg_t
 {
-    struct zmq_msg_content *content;
+    void *content;
     unsigned char shared;
     uint16_t vsm_size;
     unsigned char vsm_data [ZMQ_MAX_VSM_SIZE];
 };
 
 //  Initialise an empty message (zero bytes long).
-ZMQ_EXPORT int zmq_msg_init (zmq_msg *msg);
+ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg);
 
 //  Initialise a message 'size' bytes long.
 //
 //  Errors: ENOMEM - the size is too large to allocate.
-ZMQ_EXPORT int zmq_msg_init_size (zmq_msg *msg, size_t size);
+ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
 
 //  Initialise a message from an existing buffer. Message isn't copied,
 //  instead 0SOCKETS infrastructure take ownership of the buffer and call
 //  deallocation functio (ffn) once it's not needed anymore.
-ZMQ_EXPORT int zmq_msg_init_data (zmq_msg *msg, void *data, size_t size,
+ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size,
     zmq_free_fn *ffn);
 
 //  Deallocate the message.
-ZMQ_EXPORT int zmq_msg_close (zmq_msg *msg);
+ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg);
 
 //  Move the content of the message from 'src' to 'dest'. The content isn't
 //  copied, just moved. 'src' is an empty message after the call. Original
 //  content of 'dest' message is deallocated.
-ZMQ_EXPORT int zmq_msg_move (zmq_msg *dest, zmq_msg *src);
+ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
 
 //  Copy the 'src' message to 'dest'. The content isn't copied, instead
 //  reference count is increased. Don't modify the message data after the
 //  call as they are shared between two messages. Original content of 'dest'
 //  message is deallocated.
-ZMQ_EXPORT int zmq_msg_copy (zmq_msg *dest, zmq_msg *src);
+ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
 
 //  Returns pointer to message data.
-ZMQ_EXPORT void *zmq_msg_data (zmq_msg *msg);
+ZMQ_EXPORT void *zmq_msg_data (zmq_msg_t *msg);
 
 //  Return size of message data (in bytes).
-ZMQ_EXPORT size_t zmq_msg_size (zmq_msg *msg);
+ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg);
 
 //  Returns type of the message.
-ZMQ_EXPORT int zmq_msg_type (zmq_msg *msg);
+ZMQ_EXPORT int zmq_msg_type (zmq_msg_t *msg);
 
 //  Initialise 0SOCKETS context. 'app_threads' specifies maximal number
 //  of application threads that can have open sockets at the same time.
@@ -163,7 +163,7 @@ ZMQ_EXPORT int zmq_close (void *s);
 
 //  Sets an option on the socket.
 //  EINVAL - unknown option, a value with incorrect length or an invalid value.
-ZMQ_EXPORT int zmq_setsockopt (void *s, int option_, void *optval_,
+ZMQ_EXPORT int zmq_setsockopt (void *s, int option_, const void *optval_,
     size_t optvallen_); 
 
 //  Bind the socket to a particular address.
@@ -182,7 +182,7 @@ ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
 //  Errors: EAGAIN - message cannot be sent at the moment (applies only to
 //                   non-blocking send).
 //          ENOTSUP - function isn't supported by particular socket type.
-ZMQ_EXPORT int zmq_send (void *s, zmq_msg *msg, int flags);
+ZMQ_EXPORT int zmq_send (void *s, zmq_msg_t *msg, int flags);
 
 //  Flush the messages that were send using ZMQ_NOFLUSH flag down the stream.
 //
@@ -196,7 +196,7 @@ ZMQ_EXPORT int zmq_flush (void *s);
 //  Errors: EAGAIN - message cannot be received at the moment (applies only to
 //                   non-blocking receive).
 //          ENOTSUP - function isn't supported by particular socket type.
-ZMQ_EXPORT int zmq_recv (void *s, zmq_msg *msg, int flags);
+ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
 
 #ifdef __cplusplus
 }
diff --git a/include/zmq.hpp b/include/zmq.hpp
index e8e4f15..8397464 100644
--- a/include/zmq.hpp
+++ b/include/zmq.hpp
@@ -74,7 +74,7 @@ namespace zmq
     //  copied it - the behaviour is undefined. Don't change the body of the
     //  received message either - other threads may be accessing it in parallel.
 
-    class message_t : private zmq_msg
+    class message_t : private zmq_msg_t
     {
         friend class socket_t;
 
@@ -139,7 +139,7 @@ namespace zmq
         //  of data after the operation.
         inline void move_to (message_t *msg_)
         {
-            int rc = zmq_msg_move (this, (zmq_msg*) msg_);
+            int rc = zmq_msg_move (this, (zmq_msg_t*) msg_);
             assert (rc == 0);
         }
 
@@ -148,7 +148,7 @@ namespace zmq
         //  these get deallocated.
         inline void copy_to (message_t *msg_)
         {
-            int rc = zmq_msg_copy (this, (zmq_msg*) msg_);
+            int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_);
             assert (rc == 0);
         }
 
@@ -230,9 +230,10 @@ namespace zmq
             assert (rc == 0);
         }
 
-        template <typename T> inline void setsockopt (int option_, T &value_)
+        inline void setsockopt (int option_, const void *optval_,
+            size_t optvallen_)
         {
-            int rc = zmq_setsockopt (ptr, option_, (void*) &value_, sizeof (T));
+            int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
             assert (rc == 0);
         }
 
diff --git a/src/Makefile.am b/src/Makefile.am
index 9d6127c..396e3a3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -23,7 +23,7 @@ libzmq_la_SOURCES = \
     i_poll_events.hpp \
     i_signaler.hpp \
     kqueue.hpp \
-    msg.hpp \
+    msg_content.hpp \
     mutex.hpp \
     object.hpp \
     options.hpp \
@@ -47,11 +47,12 @@ libzmq_la_SOURCES = \
     ypollset.hpp \
     yqueue.hpp \
     zmq_connecter.hpp \
+    zmq_connecter_init.hpp \
     zmq_decoder.hpp \
     zmq_encoder.hpp \
     zmq_engine.hpp \
-    zmq_init.hpp \
     zmq_listener.hpp \
+    zmq_listener_init.hpp \
     app_thread.cpp \
     devpoll.cpp \
     dispatcher.cpp \
@@ -77,11 +78,12 @@ libzmq_la_SOURCES = \
     ypollset.cpp \
     zmq.cpp \
     zmq_connecter.cpp \
+    zmq_connecter_init.cpp \
     zmq_decoder.cpp \
     zmq_encoder.cpp \
     zmq_engine.cpp \
-    zmq_init.cpp \
-    zmq_listener.cpp
+    zmq_listener.cpp \
+    zmq_listener_init.cpp
 
 libzmq_la_LDFLAGS = -version-info 0:0:0
 libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
index be2e007..8901c04 100644
--- a/src/i_inout.hpp
+++ b/src/i_inout.hpp
@@ -27,8 +27,8 @@ namespace zmq
 
     struct i_inout
     {
-        virtual bool read (::zmq_msg *msg_) = 0;
-        virtual bool write (::zmq_msg *msg_) = 0;
+        virtual bool read (::zmq_msg_t *msg_) = 0;
+        virtual bool write (::zmq_msg_t *msg_) = 0;
         virtual void flush () = 0;
     };
 
diff --git a/src/msg.hpp b/src/msg.hpp
deleted file mode 100644
index 4f35961..0000000
--- a/src/msg.hpp
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-    Copyright (c) 2007-2009 FastMQ Inc.
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
-
-    You should have received a copy of the Lesser GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_MSG_HPP_INCLUDE__
-#define __ZMQ_MSG_HPP_INCLUDE__
-
-#include <stddef.h>
-
-#include "../include/zmq.h"
-
-#include "atomic_counter.hpp"
-
-//namespace zmq
-//{
-
-    //  Shared message buffer. Message data are either allocated in one
-    //  continuous block along with this structure - thus avoiding one
-    //  malloc/free pair or they are stored in used-supplied memory.
-    //  In the latter case, ffn member stores pointer to the function to be
-    //  used to deallocate the data. If the buffer is actually shared (there
-    //  are at least 2 references to it) refcount member contains number of
-    //  references.
-    struct zmq_msg_content
-    {
-        void *data;
-        size_t size;
-        zmq_free_fn *ffn;
-        zmq::atomic_counter_t refcnt;
-    };
-
-//}
-
-#endif
diff --git a/src/msg_content.hpp b/src/msg_content.hpp
new file mode 100644
index 0000000..b468746
--- /dev/null
+++ b/src/msg_content.hpp
@@ -0,0 +1,50 @@
+/*
+    Copyright (c) 2007-2009 FastMQ Inc.
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_MSG_HPP_INCLUDE__
+#define __ZMQ_MSG_HPP_INCLUDE__
+
+#include <stddef.h>
+
+#include "../include/zmq.h"
+
+#include "atomic_counter.hpp"
+
+namespace zmq
+{
+
+    //  Shared message buffer. Message data are either allocated in one
+    //  continuous block along with this structure - thus avoiding one
+    //  malloc/free pair or they are stored in used-supplied memory.
+    //  In the latter case, ffn member stores pointer to the function to be
+    //  used to deallocate the data. If the buffer is actually shared (there
+    //  are at least 2 references to it) refcount member contains number of
+    //  references.
+
+    struct msg_content_t
+    {
+        void *data;
+        size_t size;
+        zmq_free_fn *ffn;
+        zmq::atomic_counter_t refcnt;
+    };
+
+}
+
+#endif
diff --git a/src/object.cpp b/src/object.cpp
index b1b7c8a..0a25750 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -131,9 +131,8 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
 
 void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_)
 {
-    //  Let the object know that it cannot shut down till it gets this command.
-    destination_->inc_seqnum ();
-
+    //  The assumption here is that command sequence number of the destination
+    //  object was already incremented in find_session function.
     command_t cmd;
     cmd.destination = destination_;
     cmd.type = command_t::attach;
diff --git a/src/object.hpp b/src/object.hpp
index 02a071a..31c8c40 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -30,6 +30,10 @@ namespace zmq
 
     class object_t
     {
+        //  Repository of sessions needs to use caller's send_* functions
+        //  when creating new session. TODO: Get rid of this dependency.
+        friend class socket_base_t;
+
     public:
 
         object_t (class dispatcher_t *dispatcher_, int thread_slot_);
diff --git a/src/owned.cpp b/src/owned.cpp
index 6995a39..810bed7 100644
--- a/src/owned.cpp
+++ b/src/owned.cpp
@@ -47,6 +47,14 @@ void zmq::owned_t::process_plug ()
     finalise_command ();
 }
 
+void zmq::owned_t::process_attach (zmq_engine_t *engine_)
+{
+    //  Keep track of how many commands were processed so far.
+    processed_seqnum++;
+
+    finalise_command ();
+}
+
 void zmq::owned_t::term ()
 {
     send_term_req (owner, this);
@@ -65,8 +73,8 @@ void zmq::owned_t::finalise_command ()
     //  If termination request was already received and there are no more
     //  commands to wait for, terminate the object.
     if (shutting_down && processed_seqnum == sent_seqnum.get ()) {
-        send_term_ack (owner);
         process_unplug ();
+        send_term_ack (owner);
         delete this;
     }
 }
diff --git a/src/owned.hpp b/src/owned.hpp
index 22595d1..78036a3 100644
--- a/src/owned.hpp
+++ b/src/owned.hpp
@@ -59,6 +59,10 @@ namespace zmq
         //  handler.
         void process_plug ();
 
+        //  It's vital that session invokes io_object_t::process_attach
+        //  at the end of it's own attach handler.
+        void process_attach (class zmq_engine_t *engine_);
+
         //  io_object_t defines a new handler used to disconnect the object
         //  from the poller object. Implement the handlen in the derived
         //  classes to ensure sane cleanup.
diff --git a/src/pipe.hpp b/src/pipe.hpp
index d771120..28e4b4d 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -29,7 +29,7 @@ namespace zmq
 {
 
     //  Message pipe.
-    class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity>
+    class pipe_t : public ypipe_t <zmq_msg_t, false, message_pipe_granularity>
     {
     };
 
diff --git a/src/session.cpp b/src/session.cpp
index 2bb4ff6..fc1f858 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -22,9 +22,10 @@
 #include "err.hpp"
 
 zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
-      zmq_engine_t *engine_) :
+      const char *name_) :
     owned_t (parent_, owner_),
-    engine (engine_)
+    engine (NULL),
+    name (name_)
 {
 }
 
@@ -32,12 +33,12 @@ zmq::session_t::~session_t ()
 {
 }
 
-bool zmq::session_t::read (::zmq_msg *msg_)
+bool zmq::session_t::read (::zmq_msg_t *msg_)
 {
     return false;
 }
 
-bool zmq::session_t::write (::zmq_msg *msg_)
+bool zmq::session_t::write (::zmq_msg_t *msg_)
 {
     return false;
 }
@@ -48,14 +49,34 @@ void zmq::session_t::flush ()
 
 void zmq::session_t::process_plug ()
 {
-    zmq_assert (engine);
-    engine->plug (this);
+    //  Register the session with the socket.
+    bool ok = owner->register_session (name.c_str (), this);
+
+    //  There's already a session with the specified identity.
+    //  We should syslog it and drop the session. TODO
+    zmq_assert (ok);
+
     owned_t::process_plug ();
 }
 
 void zmq::session_t::process_unplug ()
 {
-    zmq_assert (engine);
-    engine->unplug ();
-    delete engine;
+    //  Unregister the session from the socket.
+    bool ok = owner->unregister_session (name.c_str ());
+    zmq_assert (ok);
+
+    if (engine) {
+        engine->unplug ();
+        delete engine;
+        engine = NULL;
+    }
+}
+
+void zmq::session_t::process_attach (class zmq_engine_t *engine_)
+{
+    zmq_assert (engine_);
+    engine = engine_;
+    engine->plug (this);
+
+    owned_t::process_attach (engine_);
 }
diff --git a/src/session.hpp b/src/session.hpp
index 2cb8e18..6d6bcf7 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -20,8 +20,11 @@
 #ifndef __ZMQ_SESSION_HPP_INCLUDED__
 #define __ZMQ_SESSION_HPP_INCLUDED__
 
+#include <string>
+
 #include "i_inout.hpp"
 #include "owned.hpp"
+#include "options.hpp"
 
 namespace zmq
 {
@@ -30,24 +33,28 @@ namespace zmq
     {
     public:
 
-        session_t (object_t *parent_, socket_base_t *owner_,
-            class zmq_engine_t *engine_);
+        session_t (object_t *parent_, socket_base_t *owner_, const char *name_);
 
     private:
 
         ~session_t ();
 
         //  i_inout interface implementation.
-        bool read (::zmq_msg *msg_);
-        bool write (::zmq_msg *msg_);
+        bool read (::zmq_msg_t *msg_);
+        bool write (::zmq_msg_t *msg_);
         void flush ();
 
         //  Handlers for incoming commands.
         void process_plug ();
         void process_unplug ();
+        void process_attach (class zmq_engine_t *engine_);
 
         class zmq_engine_t *engine;
 
+        //  The name of the session. One that is used to register it with
+        //  socket-level repository of sessions.
+        std::string name;
+
         session_t (const session_t&);
         void operator = (const session_t&);
     };
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index fa6c1e3..fb7bdcf 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -17,6 +17,7 @@
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
+#include <string>
 #include <algorithm>
 
 #include "../include/zmq.h"
@@ -30,16 +31,20 @@
 #include "session.hpp"
 #include "config.hpp"
 #include "owned.hpp"
+#include "uuid.hpp"
 
 zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
     object_t (parent_),
     pending_term_acks (0),
-    app_thread (parent_)
+    app_thread (parent_),
+    shutting_down (false)
 {    
 }
 
 zmq::socket_base_t::~socket_base_t ()
 {
+    shutting_down = true;
+
     while (true) {
 
         //  On third pass of the loop there should be no more I/O objects
@@ -64,10 +69,12 @@ zmq::socket_base_t::~socket_base_t ()
     }
 
     //  Check whether there are no session leaks.
+    sessions_sync.lock ();
     zmq_assert (sessions.empty ());
+    sessions_sync.unlock ();
 }
 
-int zmq::socket_base_t::setsockopt (int option_, void *optval_,
+int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
     size_t optvallen_)
 {
     switch (option_) {
@@ -113,11 +120,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
         return 0;
 
     case ZMQ_IDENTITY:
-        if (optvallen_ != sizeof (const char*)) {
-            errno = EINVAL;
-            return -1;
-        }
-        options.identity = (const char*) optval_;
+        options.identity.assign ((const char*) optval_, optvallen_);
         return 0;
 
     default:
@@ -141,18 +144,34 @@ int zmq::socket_base_t::bind (const char *addr_)
 
 int zmq::socket_base_t::connect (const char *addr_)
 {
+    //  Generate a unique name for the session.
+    std::string session_name ("#");
+    session_name += uuid_t ().to_string ();
+
+    //  Create the session.
+    io_thread_t *io_thread = choose_io_thread (options.affinity);
+    session_t *session = new session_t (io_thread, this, session_name.c_str ());
+    zmq_assert (session);
+    send_plug (session);
+    send_own (this, session);
+
+    //  Create the connecter object. Supply it with the session name so that
+    //  it can bind the new connection to the session once it is established.
     zmq_connecter_t *connecter = new zmq_connecter_t (
-        choose_io_thread (options.affinity), this, options);
+        choose_io_thread (options.affinity), this, options,
+        session_name.c_str ());
     int rc = connecter->set_address (addr_);
-    if (rc != 0)
+    if (rc != 0) {
+        delete connecter;
         return -1;
-
+    }
     send_plug (connecter);
     send_own (this, connecter);
+
     return 0;
 }
 
-int zmq::socket_base_t::send (struct zmq_msg *msg_, int flags_)
+int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
 {
     zmq_assert (false);
 }
@@ -162,7 +181,7 @@ int zmq::socket_base_t::flush ()
     zmq_assert (false);
 }
 
-int zmq::socket_base_t::recv (struct zmq_msg *msg_, int flags_)
+int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
 {
     zmq_assert (false);
 }
@@ -174,35 +193,40 @@ int zmq::socket_base_t::close ()
     return 0;
 }
 
-void zmq::socket_base_t::register_session (const char *name_,
+bool zmq::socket_base_t::register_session (const char *name_,
     session_t *session_)
 {
     sessions_sync.lock ();
-    bool inserted = sessions.insert (std::make_pair (name_, session_)).second;
-    zmq_assert (inserted);
+    bool registered = sessions.insert (std::make_pair (name_, session_)).second;
     sessions_sync.unlock ();
+    return registered;
 }
 
-void zmq::socket_base_t::unregister_session (const char *name_)
+bool zmq::socket_base_t::unregister_session (const char *name_)
 {
     sessions_sync.lock ();
     sessions_t::iterator it = sessions.find (name_);
-    zmq_assert (it != sessions.end ());
+    bool unregistered = (it != sessions.end ());
     sessions.erase (it);
     sessions_sync.unlock ();
+    return unregistered;
 }
 
-zmq::session_t *zmq::socket_base_t::get_session (const char *name_)
+zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
 {
     sessions_sync.lock ();
+
     sessions_t::iterator it = sessions.find (name_);
-    session_t *session = NULL;
-    if (it != sessions.end ()) {
-        session = it->second;
-        session->inc_seqnum ();
-    }   
+    if (it == sessions.end ()) {
+        sessions_sync.unlock ();
+        return NULL;
+    }
+
+    //  Prepare the session for subsequent attach command.
+    it->second->inc_seqnum ();
+
     sessions_sync.unlock ();
-    return session;
+    return it->second;    
 }
 
 void zmq::socket_base_t::process_own (owned_t *object_)
@@ -212,6 +236,11 @@ void zmq::socket_base_t::process_own (owned_t *object_)
 
 void zmq::socket_base_t::process_term_req (owned_t *object_)
 {
+    //  When shutting down we can ignore termination requests from owned
+    //  objects. They are going to be terminated anyway.
+    if (shutting_down)
+        return;
+
     //  If I/O object is well and alive ask it to terminate.
     io_objects_t::iterator it = std::find (io_objects.begin (),
         io_objects.end (), object_);
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 8e99654..20ac4e2 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -40,23 +40,22 @@ namespace zmq
         ~socket_base_t ();
 
         //  Interface for communication with the API layer.
-        virtual int setsockopt (int option_, void *optval_, size_t optvallen_);
+        virtual int setsockopt (int option_, const void *optval_,
+            size_t optvallen_);
         virtual int bind (const char *addr_);
         virtual int connect (const char *addr_);
-        virtual int send (struct zmq_msg *msg_, int flags_);
+        virtual int send (struct zmq_msg_t *msg_, int flags_);
         virtual int flush ();
-        virtual int recv (struct zmq_msg *msg_, int flags_);
+        virtual int recv (struct zmq_msg_t *msg_, int flags_);
         virtual int close ();
 
-        //  Functions that owned objects use to manipulate socket's list
-        //  of existing sessions.
-        //  Note that this functionality cannot be implemented via inter-thread
+        //  The list of sessions cannot be accessed via inter-thread
         //  commands as it is unacceptable to wait for the completion of the
         //  action till user application yields control of the application
         //  thread to 0MQ.
-        void register_session (const char *name_, class session_t *session_);
-        void unregister_session (const char *name_);
-        class session_t *get_session (const char *name_);
+        bool register_session (const char *name_, class session_t *session_);
+        bool unregister_session (const char *name_);
+        class session_t *find_session (const char *name_);
 
     private:
 
@@ -80,10 +79,17 @@ namespace zmq
         //  Socket options.
         options_t options;
 
+        //  If true, socket is already shutting down. No new work should be
+        //  started.
+        bool shutting_down;
+
         //  List of existing sessions. This list is never referenced from within
         //  the socket, instead it is used by I/O objects owned by the session.
         //  As those objects can live in different threads, the access is
         //  synchronised using 'sessions_sync' mutex.
+        //  Local sessions are those named by the local instance of 0MQ.
+        //  Remote sessions are the sessions who's identities are provided by
+        //  the remote party.
         typedef std::map <std::string, session_t*> sessions_t;
         sessions_t sessions;
         mutex_t sessions_sync;
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 7ebe003..49096ad 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -26,76 +26,79 @@
 #include "socket_base.hpp"
 #include "err.hpp"
 #include "dispatcher.hpp"
-#include "msg.hpp"
+#include "msg_content.hpp"
 
-int zmq_msg_init (zmq_msg *msg_)
+int zmq_msg_init (zmq_msg_t *msg_)
 {
-    msg_->content = (zmq_msg_content*) ZMQ_VSM;
+    msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
     msg_->vsm_size = 0;
     return 0;
 }
 
-int zmq_msg_init_size (zmq_msg *msg_, size_t size_)
+int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
 {
     if (size_ <= ZMQ_MAX_VSM_SIZE) {
-        msg_->content = (zmq_msg_content*) ZMQ_VSM;
+        msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
         msg_->vsm_size = (uint16_t) size_;
     }
     else {
-        msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content) +
-            size_);
+        msg_->content =
+            (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
         if (!msg_->content) {
             errno = ENOMEM;
             return -1;
         }
         msg_->shared = 0;
-            
-        msg_->content->data = (void*) (msg_->content + 1);
-        msg_->content->size = size_;
-        msg_->content->ffn = NULL;
-        new (&msg_->content->refcnt) zmq::atomic_counter_t ();
+
+        zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
+        content->data = (void*) (content + 1);
+        content->size = size_;
+        content->ffn = NULL;
+        new (&content->refcnt) zmq::atomic_counter_t ();
     }
     return 0;
 }
 
-int zmq_msg_init_data (zmq_msg *msg_, void *data_, size_t size_,
+int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
     zmq_free_fn *ffn_)
 {
     msg_->shared = 0;
-    msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content));
+    msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
     zmq_assert (msg_->content);
-    msg_->content->data = data_;
-    msg_->content->size = size_;
-    msg_->content->ffn = ffn_;
-    new (&msg_->content->refcnt) zmq::atomic_counter_t ();
+    zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
+    content->data = data_;
+    content->size = size_;
+    content->ffn = ffn_;
+    new (&content->refcnt) zmq::atomic_counter_t ();
     return 0;
 }
 
-int zmq_msg_close (zmq_msg *msg_)
+int zmq_msg_close (zmq_msg_t *msg_)
 {
-    //  For VSMs and delimiters there are no resources to free
-    if (msg_->content == (zmq_msg_content*) ZMQ_DELIMITER ||
-          msg_->content == (zmq_msg_content*) ZMQ_VSM ||
-          msg_->content == (zmq_msg_content*) ZMQ_GAP)
+    //  For VSMs and delimiters there are no resources to free.
+    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER ||
+          msg_->content == (zmq::msg_content_t*) ZMQ_VSM ||
+          msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
         return 0;
 
-    //  If the content is not shared, or if it is shared and the reference
+    //  If the content is not shared, or if it is shared and the reference.
     //  count has dropped to zero, deallocate it.
-    if (!msg_->shared || !msg_->content->refcnt.sub (1)) {
+    zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
+    if (!msg_->shared || !content->refcnt.sub (1)) {
 
-        //  We used "placement new" operator to initialize the reference
+        //  We used "placement new" operator to initialize the reference.
         //  counter so we call its destructor now.
-        msg_->content->refcnt.~atomic_counter_t ();
+        content->refcnt.~atomic_counter_t ();
 
-        if (msg_->content->ffn)
-            msg_->content->ffn (msg_->content->data);
-        free (msg_->content);
+        if (content->ffn)
+            content->ffn (content->data);
+        free (content);
     }
 
     return 0;
 }
 
-int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_)
+int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
 {
     zmq_msg_close (dest_);
     *dest_ = *src_;
@@ -103,23 +106,24 @@ int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_)
     return 0;
 }
 
-int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_)
+int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
 {
     zmq_msg_close (dest_);
 
     //  VSMs and delimiters require no special handling.
     if (src_->content !=
-          (zmq_msg_content*) ZMQ_DELIMITER &&
-          src_->content != (zmq_msg_content*) ZMQ_VSM &&
-          src_->content != (zmq_msg_content*) ZMQ_GAP) {
+          (zmq::msg_content_t*) ZMQ_DELIMITER &&
+          src_->content != (zmq::msg_content_t*) ZMQ_VSM &&
+          src_->content != (zmq::msg_content_t*) ZMQ_GAP) {
 
         //  One reference is added to shared messages. Non-shared messages
         //  are turned into shared messages and reference count is set to 2.
+        zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
         if (src_->shared)
-            src_->content->refcnt.add (1);
+            content->refcnt.add (1);
         else {
             src_->shared = true;
-            src_->content->refcnt.set (2);
+            content->refcnt.set (2);
         }
     }
 
@@ -127,32 +131,34 @@ int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_)
     return 0;
 }
 
-void *zmq_msg_data (zmq_msg *msg_)
+void *zmq_msg_data (zmq_msg_t *msg_)
 {
-    if (msg_->content == (zmq_msg_content*) ZMQ_VSM)
+    if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
         return msg_->vsm_data;
     if (msg_->content ==
-          (zmq_msg_content*) ZMQ_DELIMITER ||
-          msg_->content == (zmq_msg_content*) ZMQ_GAP)
+          (zmq::msg_content_t*) ZMQ_DELIMITER ||
+          msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
         return NULL;
-    return msg_->content->data;
+
+    return ((zmq::msg_content_t*) msg_->content)->data;
 }
 
-size_t zmq_msg_size (zmq_msg *msg_)
+size_t zmq_msg_size (zmq_msg_t *msg_)
 {
-    if (msg_->content == (zmq_msg_content*) ZMQ_VSM)
+    if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
         return msg_->vsm_size;
     if (msg_->content ==
-          (zmq_msg_content*) ZMQ_DELIMITER ||
-          msg_->content == (zmq_msg_content*) ZMQ_GAP)
+          (zmq::msg_content_t*) ZMQ_DELIMITER ||
+          msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
         return 0;
-    return msg_->content->size;
+
+    return ((zmq::msg_content_t*) msg_->content)->size;
 }
 
-int zmq_msg_type (zmq_msg *msg_)
+int zmq_msg_type (zmq_msg_t *msg_)
 {
     //  If it's a genuine message, return 0.
-    if (msg_->content >= (zmq_msg_content*) ZMQ_VSM)
+    if (msg_->content >= (zmq::msg_content_t*) ZMQ_VSM)
             return 0;
 
     //   Trick the compiler to believe that content is an integer.
@@ -192,7 +198,8 @@ int zmq_close (void *s_)
     return 0;
 }
 
-int zmq_setsockopt (void *s_, int option_, void *optval_, size_t optvallen_)
+int zmq_setsockopt (void *s_, int option_, const void *optval_,
+    size_t optvallen_)
 {
     return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
         optvallen_));
@@ -208,7 +215,7 @@ int zmq_connect (void *s_, const char *addr_)
     return (((zmq::socket_base_t*) s_)->connect (addr_));
 }
 
-int zmq_send (void *s_, zmq_msg *msg_, int flags_)
+int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
 {
     return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
 }
@@ -218,7 +225,7 @@ int zmq_flush (void *s_)
     return (((zmq::socket_base_t*) s_)->flush ());
 }
 
-int zmq_recv (void *s_, zmq_msg *msg_, int flags_)
+int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
 {
     return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
 }
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 00c8cb2..e4e7eea 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -18,16 +18,18 @@
 */
 
 #include "zmq_connecter.hpp"
-#include "zmq_init.hpp"
+#include "zmq_connecter_init.hpp"
 #include "io_thread.hpp"
 #include "err.hpp"
 
 zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
-      socket_base_t *owner_, const options_t &options_) :
+      socket_base_t *owner_, const options_t &options_,
+      const char *session_name_) :
     owned_t (parent_, owner_),
     io_object_t (parent_),
     handle_valid (false),
-    options (options_)
+    options (options_),
+    session_name (session_name_)
 {
 }
 
@@ -76,7 +78,8 @@ void zmq::zmq_connecter_t::out_event ()
 
     //  Create an init object. 
     io_thread_t *io_thread = choose_io_thread (options.affinity);
-    zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, true, options);
+    zmq_connecter_init_t *init = new zmq_connecter_init_t (io_thread, owner,
+        fd, options, session_name.c_str ());
     zmq_assert (init);
     send_plug (init);
     send_own (owner, init);
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index dcdec19..e308502 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -20,6 +20,8 @@
 #ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
 #define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
 
+#include <string>
+
 #include "owned.hpp"
 #include "io_object.hpp"
 #include "tcp_connecter.hpp"
@@ -34,15 +36,14 @@ namespace zmq
     public:
 
         zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_,
-            const options_t &options_);
+            const options_t &options_, const char *session_name_);
+        ~zmq_connecter_t ();
 
         //  Set IP address to connect to.
         int set_address (const char *addr_);
 
     private:
 
-        ~zmq_connecter_t ();
-
         //  Handlers for incoming commands.
         void process_plug ();
         void process_unplug ();
@@ -68,6 +69,9 @@ namespace zmq
         //  Associated socket options.
         options_t options;
 
+        //  Name of the session associated with the connecter.
+        std::string session_name;
+
         zmq_connecter_t (const zmq_connecter_t&);
         void operator = (const zmq_connecter_t&);
     };
diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp
new file mode 100644
index 0000000..7048bd1
--- /dev/null
+++ b/src/zmq_connecter_init.cpp
@@ -0,0 +1,94 @@
+/*
+    Copyright (c) 2007-2009 FastMQ Inc.
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "zmq_connecter_init.hpp"
+#include "io_thread.hpp"
+#include "session.hpp"
+#include "err.hpp"
+
+zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
+      socket_base_t *owner_, fd_t fd_, const options_t &options_,
+      const char *session_name_) :
+    owned_t (parent_, owner_),
+    options (options_),
+    session_name (session_name_)
+{
+    //  Create associated engine object.
+    engine = new zmq_engine_t (parent_, fd_);
+    zmq_assert (engine);
+}
+
+zmq::zmq_connecter_init_t::~zmq_connecter_init_t ()
+{
+    if (engine)
+        delete engine;
+}
+
+bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_)
+{
+    //  Send identity.
+    int rc = zmq_msg_init_size (msg_, options.identity.size ());
+    zmq_assert (rc == 0);
+    memcpy (zmq_msg_data (msg_), options.identity.c_str (),
+        options.identity.size ());
+
+    //  Initialisation is done at this point. Disconnect the engine from
+    //  the init object.
+    engine->unplug ();
+
+    //  Find the session associated with this connecter. If it doesn't exist
+    //  drop the newly created connection. If it does, attach it to the
+    //  connection.
+    session_t *session = owner->find_session (session_name.c_str ());
+    if (!session) {
+        //  TODO
+        zmq_assert (false);
+    }
+    send_attach (session, engine);
+    engine = NULL;
+
+    //  Destroy the init object.
+    term ();
+
+    return true;
+}
+
+bool zmq::zmq_connecter_init_t::write (::zmq_msg_t *msg_)
+{
+    return false;
+}
+
+void zmq::zmq_connecter_init_t::flush ()
+{
+    //  We are not expecting any messages. No point in flushing.
+    zmq_assert (false);
+}
+
+void zmq::zmq_connecter_init_t::process_plug ()
+{
+    zmq_assert (engine);
+    engine->plug (this);
+    owned_t::process_plug ();
+}
+
+void zmq::zmq_connecter_init_t::process_unplug ()
+{
+    if (engine)
+        engine->unplug ();
+}
diff --git a/src/zmq_connecter_init.hpp b/src/zmq_connecter_init.hpp
new file mode 100644
index 0000000..79ea9e2
--- /dev/null
+++ b/src/zmq_connecter_init.hpp
@@ -0,0 +1,75 @@
+/*
+    Copyright (c) 2007-2009 FastMQ Inc.
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__
+#define __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__
+
+#include <string>
+
+#include "i_inout.hpp"
+#include "owned.hpp"
+#include "zmq_engine.hpp"
+#include "stdint.hpp"
+#include "fd.hpp"
+#include "options.hpp"
+
+namespace zmq
+{
+
+    //  The class handles initialisation phase of native 0MQ wire-level
+    //  protocol on the connecting side of the connection.
+
+    class zmq_connecter_init_t : public owned_t, public i_inout
+    {
+    public:
+
+        zmq_connecter_init_t (class io_thread_t *parent_, socket_base_t *owner_,
+            fd_t fd_, const options_t &options, const char *session_name_);
+        ~zmq_connecter_init_t ();
+
+    private:
+
+        //  i_inout interface implementation.
+        bool read (::zmq_msg_t *msg_);
+        bool write (::zmq_msg_t *msg_);
+        void flush ();
+
+        //  Handlers for incoming commands.
+        void process_plug ();
+        void process_unplug ();
+
+        //  Engine is created by zmq_connecter_init_t object. Once the
+        //  initialisation phase is over it is passed to a session object,
+        //  possibly running in a different I/O thread.
+        zmq_engine_t *engine;
+
+        //  Associated socket options.
+        options_t options;
+
+        //  Name of the session to bind new connection to. Makes sense only
+        //  when 'connected' is true.
+        std::string session_name;
+
+        zmq_connecter_init_t (const zmq_connecter_init_t&);
+        void operator = (const zmq_connecter_init_t&);
+    };
+
+}
+
+#endif
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index 17c28f8..212e68e 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -45,7 +45,7 @@ namespace zmq
 
         struct i_inout *destination;
         unsigned char tmpbuf [8];
-        ::zmq_msg in_progress;
+        ::zmq_msg_t in_progress;
 
         zmq_decoder_t (const zmq_decoder_t&);
         void operator = (const zmq_decoder_t&);
diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp
index 89af265..29563fc 100644
--- a/src/zmq_encoder.hpp
+++ b/src/zmq_encoder.hpp
@@ -43,7 +43,7 @@ namespace zmq
         bool message_ready ();
 
         struct i_inout *source;
-        ::zmq_msg in_progress;
+        ::zmq_msg_t in_progress;
         unsigned char tmpbuf [9];
 
         zmq_encoder_t (const zmq_encoder_t&);
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
deleted file mode 100644
index 124622d..0000000
--- a/src/zmq_init.cpp
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
-    Copyright (c) 2007-2009 FastMQ Inc.
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
-
-    You should have received a copy of the Lesser GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "zmq_init.hpp"
-#include "io_thread.hpp"
-#include "session.hpp"
-#include "err.hpp"
-
-zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
-      fd_t fd_, bool connected_, const options_t &options_) :
-    owned_t (parent_, owner_),
-    connected (connected_),
-    options (options_)
-{
-    //  Create associated engine object.
-    engine = new zmq_engine_t (parent_, fd_);
-    zmq_assert (engine);
-}
-
-zmq::zmq_init_t::~zmq_init_t ()
-{
-    if (engine)
-        delete engine;
-}
-
-bool zmq::zmq_init_t::read (::zmq_msg *msg_)
-{
-    //  On the listening side, no initialisation data are sent to the peer.
-    if (!connected)
-        return false;
-
-    //  Send identity.
-    int rc = zmq_msg_init_size (msg_, options.identity.size ());
-    zmq_assert (rc == 0);
-    memcpy (zmq_msg_data (msg_), options.identity.c_str (),
-        options.identity.size ());
-
-    //  Initialisation is done.
-    create_session ();
-
-    return true;
-}
-
-bool zmq::zmq_init_t::write (::zmq_msg *msg_)
-{
-    //  On the connecting side no initialisation data are expected.
-    if (connected)
-        return false;
-
-    //  Retreieve the identity.
-    options.identity = std::string ((const char*) zmq_msg_data (msg_),
-        zmq_msg_size (msg_));
-
-    //  Initialisation is done.
-    create_session ();
-
-    return true;
-}
-
-void zmq::zmq_init_t::flush ()
-{
-    //  No need to do anything. zmq_init_t does no batching of messages.
-    //  Each message is processed immediately on write.
-}
-
-void zmq::zmq_init_t::process_plug ()
-{
-    zmq_assert (engine);
-    engine->plug (this);
-    owned_t::process_plug ();
-}
-
-void zmq::zmq_init_t::process_unplug ()
-{
-    if (engine)
-        engine->unplug ();
-}
-
-void zmq::zmq_init_t::create_session ()
-{
-    //  Disconnect engine from the init object.
-    engine->unplug ();
-
-    //  Create the session instance.
-    io_thread_t *io_thread = choose_io_thread (options.affinity);
-    session_t *session = new session_t (io_thread, owner, engine);
-    zmq_assert (session);
-    engine = NULL;
-
-    //  Pass session/engine pair to a chosen I/O thread.
-    send_plug (session);
-    send_own (owner, session);
-
-    //  Destroy the init object.
-    term ();
-}
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
deleted file mode 100644
index 5eb289e..0000000
--- a/src/zmq_init.hpp
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
-    Copyright (c) 2007-2009 FastMQ Inc.
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the Lesser GNU General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    Lesser GNU General Public License for more details.
-
-    You should have received a copy of the Lesser GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
-#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
-
-#include <string>
-
-#include "i_inout.hpp"
-#include "owned.hpp"
-#include "zmq_engine.hpp"
-#include "stdint.hpp"
-#include "fd.hpp"
-#include "options.hpp"
-
-namespace zmq
-{
-
-    //  The class handles initialisation phase of native 0MQ wire-level
-    //  protocol. Currently it can be used to handle both sides of the
-    //  connection. If it grows to complex, we can separate the two into
-    //  distinct classes.
-
-    class zmq_init_t : public owned_t, public i_inout
-    {
-    public:
-
-        //  Set 'connected' to true if the connection was created by 'connect'
-        //  function. If it was accepted from a listening socket, set it to
-        //  false.
-        zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, fd_t fd_,
-            bool connected_, const options_t &options);
-        ~zmq_init_t ();
-
-    private:
-
-        //  i_inout interface implementation.
-        bool read (::zmq_msg *msg_);
-        bool write (::zmq_msg *msg_);
-        void flush ();
-
-        //  Handlers for incoming commands.
-        void process_plug ();
-        void process_unplug ();
-
-        void create_session ();
-
-        //  Engine is created by zmq_init_t object. Once the initialisation
-        //  phase is over it is passed to a session object, possibly running
-        //  in a different I/O thread.
-        zmq_engine_t *engine;
-
-        //  If true, we are on the connecting side. If false, we are on the
-        //  listening side.
-        bool connected;
-
-        //  Associated socket options.
-        options_t options;
-
-        zmq_init_t (const zmq_init_t&);
-        void operator = (const zmq_init_t&);
-    };
-
-}
-
-#endif
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 49bbf61..97164c1 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -18,7 +18,7 @@
 */
 
 #include "zmq_listener.hpp"
-#include "zmq_init.hpp"
+#include "zmq_listener_init.hpp"
 #include "io_thread.hpp"
 #include "err.hpp"
 
@@ -68,7 +68,8 @@ void zmq::zmq_listener_t::in_event ()
 
     //  Create an init object. 
     io_thread_t *io_thread = choose_io_thread (options.affinity);
-    zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, false, options);
+    zmq_listener_init_t *init = new zmq_listener_init_t (io_thread, owner,
+        fd, options);
     zmq_assert (init);
     send_plug (init);
     send_own (owner, init);
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
new file mode 100644
index 0000000..bfd79b4
--- /dev/null
+++ b/src/zmq_listener_init.cpp
@@ -0,0 +1,96 @@
+/*
+    Copyright (c) 2007-2009 FastMQ Inc.
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <string>
+
+#include "zmq_listener_init.hpp"
+#include "io_thread.hpp"
+#include "session.hpp"
+#include "err.hpp"
+
+zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
+      socket_base_t *owner_, fd_t fd_, const options_t &options_) :
+    owned_t (parent_, owner_),
+    options (options_)
+{
+    //  Create associated engine object.
+    engine = new zmq_engine_t (parent_, fd_);
+    zmq_assert (engine);
+}
+
+zmq::zmq_listener_init_t::~zmq_listener_init_t ()
+{
+    if (engine)
+        delete engine;
+}
+
+bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_)
+{
+    return false;
+}
+
+bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
+{
+    //  Retreieve the remote identity. We'll use it as a local session name.
+    std::string session_name = std::string ((const char*) zmq_msg_data (msg_),
+        zmq_msg_size (msg_));
+
+    //  Initialisation is done. Disconnect the engine from the init object.
+    engine->unplug ();
+
+    //  Have a look whether the session already exists. If it does, attach it
+    //  to the engine. If it doesn't create it first.
+    session_t *session = owner->find_session (session_name.c_str ());
+    if (!session) {
+        io_thread_t *io_thread = choose_io_thread (options.affinity);
+        session = new session_t (io_thread, owner, session_name.c_str ());
+        zmq_assert (session);
+        send_plug (session);
+        send_own (owner, session);
+
+        //  Reserve a sequence number for following 'attach' command.
+        session->inc_seqnum ();
+    }
+    send_attach (session, engine);
+    engine = NULL;
+
+    //  Destroy the init object.
+    term ();
+
+    return true;
+}
+
+void zmq::zmq_listener_init_t::flush ()
+{
+    //  No need to do anything. zmq_listener_init_t does no batching
+    //  of messages. Each message is processed immediately on write.
+}
+
+void zmq::zmq_listener_init_t::process_plug ()
+{
+    zmq_assert (engine);
+    engine->plug (this);
+    owned_t::process_plug ();
+}
+
+void zmq::zmq_listener_init_t::process_unplug ()
+{
+    if (engine)
+        engine->unplug ();
+}
diff --git a/src/zmq_listener_init.hpp b/src/zmq_listener_init.hpp
new file mode 100644
index 0000000..b061eaa
--- /dev/null
+++ b/src/zmq_listener_init.hpp
@@ -0,0 +1,71 @@
+/*
+    Copyright (c) 2007-2009 FastMQ Inc.
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the Lesser GNU General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    Lesser GNU General Public License for more details.
+
+    You should have received a copy of the Lesser GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__
+#define __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__
+
+#include <string>
+
+#include "i_inout.hpp"
+#include "owned.hpp"
+#include "zmq_engine.hpp"
+#include "stdint.hpp"
+#include "fd.hpp"
+#include "options.hpp"
+
+namespace zmq
+{
+
+    //  The class handles initialisation phase of native 0MQ wire-level
+    //  protocol on the listening side of the connection.
+
+    class zmq_listener_init_t : public owned_t, public i_inout
+    {
+    public:
+
+        zmq_listener_init_t (class io_thread_t *parent_, socket_base_t *owner_,
+            fd_t fd_, const options_t &options);
+        ~zmq_listener_init_t ();
+
+    private:
+
+        //  i_inout interface implementation.
+        bool read (::zmq_msg_t *msg_);
+        bool write (::zmq_msg_t *msg_);
+        void flush ();
+
+        //  Handlers for incoming commands.
+        void process_plug ();
+        void process_unplug ();
+
+        //  Engine is created by zmq_listener_init_t object. Once the
+        //  initialisation phase is over it is passed to a session object,
+        //  possibly running in a different I/O thread.
+        zmq_engine_t *engine;
+
+        //  Associated socket options.
+        options_t options;
+
+        zmq_listener_init_t (const zmq_listener_init_t&);
+        void operator = (const zmq_listener_init_t&);
+    };
+
+}
+
+#endif
-- 
cgit v1.2.3