From 7c1dca546d9e49e7af372e4fff9e6a87058a7f12 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <sustrik@250bpm.com>
Date: Sun, 24 Jul 2011 18:25:30 +0200
Subject: Session classes merged into a single class

Removal of ZMQ_IDENTITY resulted in various session classes doing
almost the same thing. This patch merges the classes into a single
class.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
---
 src/Makefile.am           |   4 --
 src/connect_session.cpp   | 124 ----------------------------------------------
 src/connect_session.hpp   |  65 ------------------------
 src/decoder.cpp           |  10 ++--
 src/decoder.hpp           |   4 +-
 src/encoder.cpp           |  10 ++--
 src/encoder.hpp           |   4 +-
 src/i_engine.hpp          |  21 +-------
 src/pgm_receiver.cpp      |  17 ++++---
 src/pgm_receiver.hpp      |   4 +-
 src/pgm_sender.cpp        |  11 ++--
 src/pgm_sender.hpp        |   2 +-
 src/session.cpp           |  99 ++++++++++++++++++++++++++++++------
 src/session.hpp           |  32 ++++++------
 src/socket_base.cpp       |   6 +--
 src/transient_session.cpp |  43 ----------------
 src/transient_session.hpp |  52 -------------------
 src/zmq_engine.cpp        |  51 ++++++++++---------
 src/zmq_engine.hpp        |   9 ++--
 src/zmq_listener.cpp      |   6 +--
 20 files changed, 168 insertions(+), 406 deletions(-)
 delete mode 100644 src/connect_session.cpp
 delete mode 100644 src/connect_session.hpp
 delete mode 100644 src/transient_session.cpp
 delete mode 100644 src/transient_session.hpp

diff --git a/src/Makefile.am b/src/Makefile.am
index 126a321..b02e302 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -12,7 +12,6 @@ libzmq_la_SOURCES = \
     clock.hpp \
     command.hpp \
     config.hpp \
-    connect_session.hpp \
     ctx.hpp \
     decoder.hpp \
     devpoll.hpp \
@@ -64,7 +63,6 @@ libzmq_la_SOURCES = \
     tcp_listener.hpp \
     tcp_socket.hpp \
     thread.hpp \
-    transient_session.hpp \
     trie.hpp \
     windows.hpp \
     wire.hpp \
@@ -79,7 +77,6 @@ libzmq_la_SOURCES = \
     zmq_listener.hpp \
     clock.cpp \
     ctx.cpp \
-    connect_session.cpp \
     decoder.cpp \
     devpoll.cpp \
     dist.cpp \
@@ -122,7 +119,6 @@ libzmq_la_SOURCES = \
     tcp_listener.cpp \
     tcp_socket.cpp \
     thread.cpp \
-    transient_session.cpp \
     trie.cpp \
     xpub.cpp \
     xrep.cpp \
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
deleted file mode 100644
index 14666a6..0000000
--- a/src/connect_session.cpp
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
-    Copyright (c) 2007-2011 iMatix Corporation
-    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "connect_session.hpp"
-#include "zmq_connecter.hpp"
-#include "pgm_sender.hpp"
-#include "pgm_receiver.hpp"
-#include "err.hpp"
-
-zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
-      class socket_base_t *socket_, const options_t &options_,
-      const char *protocol_, const char *address_) :
-    session_t (io_thread_, socket_, options_),
-    protocol (protocol_),
-    address (address_)
-{
-}
-
-zmq::connect_session_t::~connect_session_t ()
-{
-}
-
-void zmq::connect_session_t::process_plug ()
-{
-    //  Start connection process immediately.
-    start_connecting (false);
-}
-
-void zmq::connect_session_t::start_connecting (bool wait_)
-{
-    //  Choose I/O thread to run connecter in. Given that we are already
-    //  running in an I/O thread, there must be at least one available.
-    io_thread_t *io_thread = choose_io_thread (options.affinity);
-    zmq_assert (io_thread);
-
-    //  Create the connecter object.
-
-    //  Both TCP and IPC transports are using the same infrastructure.
-    if (protocol == "tcp" || protocol == "ipc") {
-
-        zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
-            io_thread, this, options, protocol.c_str (), address.c_str (),
-            wait_);
-        alloc_assert (connecter);
-        launch_child (connecter);
-        return;
-    }
-
-#if defined ZMQ_HAVE_OPENPGM
-
-    //  Both PGM and EPGM transports are using the same infrastructure.
-    if (protocol == "pgm" || protocol == "epgm") {
-
-        //  For EPGM transport with UDP encapsulation of PGM is used.
-        bool udp_encapsulation = (protocol == "epgm");
-
-        //  At this point we'll create message pipes to the session straight
-        //  away. There's no point in delaying it as no concept of 'connect'
-        //  exists with PGM anyway.
-        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
-
-            //  PGM sender.
-            pgm_sender_t *pgm_sender =  new (std::nothrow) pgm_sender_t (
-                io_thread, options);
-            alloc_assert (pgm_sender);
-
-            int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
-            zmq_assert (rc == 0);
-
-            send_attach (this, pgm_sender);
-        }
-        else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
-
-            //  PGM receiver.
-            pgm_receiver_t *pgm_receiver =  new (std::nothrow) pgm_receiver_t (
-                io_thread, options);
-            alloc_assert (pgm_receiver);
-
-            int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
-            zmq_assert (rc == 0);
-
-            send_attach (this, pgm_receiver);
-        }
-        else
-            zmq_assert (false);
-
-        return;
-    }
-#endif
-
-    zmq_assert (false);
-}
-
-bool zmq::connect_session_t::xattached ()
-{
-    return true;
-}
-
-bool zmq::connect_session_t::xdetached ()
-{
-    //  Reconnect.
-    start_connecting (true);
-
-    //  Don't tear the session down.
-    return true;
-}
-
diff --git a/src/connect_session.hpp b/src/connect_session.hpp
deleted file mode 100644
index cdd78f8..0000000
--- a/src/connect_session.hpp
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
-    Copyright (c) 2007-2011 iMatix Corporation
-    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
-#define __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
-
-#include <string>
-
-#include "session.hpp"
-
-namespace zmq
-{
-
-    //  Connect session contains an address to connect to. On disconnect it
-    //  attempts to reconnect.
-
-    class connect_session_t : public session_t
-    {
-    public:
-
-        connect_session_t (class io_thread_t *io_thread_,
-            class socket_base_t *socket_, const options_t &options_,
-            const char *protocol_, const char *address_);
-        ~connect_session_t ();
-
-    private:
-
-        //  Handlers for events from session base class.
-        bool xattached ();
-        bool xdetached ();
-
-        //  Start the connection process.
-        void start_connecting (bool wait_);
-
-        //  Command handlers.
-        void process_plug ();
-
-        //  Address to connect to.
-        std::string protocol;
-        std::string address;
-
-        connect_session_t (const connect_session_t&);
-        const connect_session_t &operator = (const connect_session_t&);
-    };
-
-}
-
-#endif
diff --git a/src/decoder.cpp b/src/decoder.cpp
index 8fc1d5e..01ce0bb 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -22,13 +22,13 @@
 #include <string.h>
 
 #include "decoder.hpp"
-#include "i_engine.hpp"
+#include "session.hpp"
 #include "wire.hpp"
 #include "err.hpp"
 
 zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
     decoder_base_t <decoder_t> (bufsize_),
-    sink (NULL),
+    session (NULL),
     maxmsgsize (maxmsgsize_)
 {
     int rc = in_progress.init ();
@@ -44,9 +44,9 @@ zmq::decoder_t::~decoder_t ()
     errno_assert (rc == 0);
 }
 
-void zmq::decoder_t::set_sink (i_engine_sink *sink_)
+void zmq::decoder_t::set_session (session_t *session_)
 {
-    sink = sink_;
+    session = session_;
 }
 
 bool zmq::decoder_t::one_byte_size_ready ()
@@ -136,7 +136,7 @@ bool zmq::decoder_t::message_ready ()
 {
     //  Message is completely read. Push it further and start reading
     //  new message. (in_progress is a 0-byte message after this point.)
-    if (!sink || !sink->write (&in_progress))
+    if (!session || !session->write (&in_progress))
         return false;
 
     next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 17fab4b..3ac4f7c 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -184,7 +184,7 @@ namespace zmq
         decoder_t (size_t bufsize_, int64_t maxmsgsize_);
         ~decoder_t ();
 
-        void set_sink (struct i_engine_sink *sink_);
+        void set_session (class session_t *session_);
 
     private:
 
@@ -193,7 +193,7 @@ namespace zmq
         bool flags_ready ();
         bool message_ready ();
 
-        struct i_engine_sink *sink;
+        class session_t *session;
         unsigned char tmpbuf [8];
         msg_t in_progress;
 
diff --git a/src/encoder.cpp b/src/encoder.cpp
index a9be68c..087735d 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -19,12 +19,12 @@
 */
 
 #include "encoder.hpp"
-#include "i_engine.hpp"
+#include "session.hpp"
 #include "wire.hpp"
 
 zmq::encoder_t::encoder_t (size_t bufsize_) :
     encoder_base_t <encoder_t> (bufsize_),
-    sink (NULL)
+    session (NULL)
 {
     int rc = in_progress.init ();
     errno_assert (rc == 0);
@@ -39,9 +39,9 @@ zmq::encoder_t::~encoder_t ()
     errno_assert (rc == 0);
 }
 
-void zmq::encoder_t::set_sink (i_engine_sink *sink_)
+void zmq::encoder_t::set_session (session_t *session_)
 {
-    sink = sink_;
+    session = session_;
 }
 
 bool zmq::encoder_t::size_ready ()
@@ -62,7 +62,7 @@ bool zmq::encoder_t::message_ready ()
     //  Note that new state is set only if write is successful. That way
     //  unsuccessful write will cause retry on the next state machine
     //  invocation.
-    if (!sink || !sink->read (&in_progress)) {
+    if (!session || !session->read (&in_progress)) {
         rc = in_progress.init ();
         errno_assert (rc == 0);
         return false;
diff --git a/src/encoder.hpp b/src/encoder.hpp
index 4bc22ce..b8784a3 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -163,14 +163,14 @@ namespace zmq
         encoder_t (size_t bufsize_);
         ~encoder_t ();
 
-        void set_sink (struct i_engine_sink *sink_);
+        void set_session (class session_t *session_);
 
     private:
 
         bool size_ready ();
         bool message_ready ();
 
-        struct i_engine_sink *sink;
+        class session_t *session;
         msg_t in_progress;
         unsigned char tmpbuf [10];
 
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 636985f..c49a107 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -32,7 +32,7 @@ namespace zmq
 
         //  Plug the engine to the session.
         virtual void plug (class io_thread_t *io_thread_,
-            struct i_engine_sink *sink_) = 0;
+            class session_t *session_) = 0;
 
         //  Unplug the engine from the session.
         virtual void unplug () = 0;
@@ -50,25 +50,6 @@ namespace zmq
         virtual void activate_out () = 0;
     };
 
-    //  Abstract interface to be implemented by engine sinks such as sessions.
-
-    struct i_engine_sink
-    {
-        virtual ~i_engine_sink () {}
-
-        //  Engine asks for a message to send to the network.
-        virtual bool read (class msg_t *msg_) = 0;
-
-        //  Engine received message from the network and sends it further on.
-        virtual bool write (class msg_t *msg_) = 0;
-
-        //  Flush all the previously written messages.
-        virtual void flush () = 0;
-
-        //  Engine is dead. Drop all the references to it.
-        virtual void detach () = 0;
-    };
-
 }
 
 #endif
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index b859241..dcd002e 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -29,9 +29,10 @@
 #endif
 
 #include "pgm_receiver.hpp"
-#include "err.hpp"
+#include "session.hpp"
 #include "stdint.hpp"
 #include "wire.hpp"
+#include "err.hpp"
 
 zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, 
       const options_t &options_) :
@@ -39,7 +40,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
     has_rx_timer (false),
     pgm_socket (true, options_),
     options (options_),
-    sink (NULL),
+    session (NULL),
     mru_decoder (NULL),
     pending_bytes (0)
 {
@@ -56,7 +57,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
     return pgm_socket.init (udp_encapsulation_, network_);
 }
 
-void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
+void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, session_t *session_)
 {
     //  Retrieve PGM fds and start polling.
     int socket_fd;
@@ -67,7 +68,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
     set_pollin (pipe_handle);
     set_pollin (socket_handle);
 
-    sink = sink_;
+    session = session_;
 
     //  If there are any subscriptions already queued in the session, drop them.
     drop_subscriptions ();
@@ -93,7 +94,7 @@ void zmq::pgm_receiver_t::unplug ()
     rm_fd (socket_handle);
     rm_fd (pipe_handle);
 
-    sink = NULL;
+    session = NULL;
 }
 
 void zmq::pgm_receiver_t::terminate ()
@@ -220,7 +221,7 @@ void zmq::pgm_receiver_t::in_event ()
             it->second.decoder = new (std::nothrow) decoder_t (0,
                 options.maxmsgsize);
             alloc_assert (it->second.decoder);
-            it->second.decoder->set_sink (sink);
+            it->second.decoder->set_session (session);
         }
 
         mru_decoder = it->second.decoder;
@@ -246,7 +247,7 @@ void zmq::pgm_receiver_t::in_event ()
     }
 
     //  Flush any messages decoder may have produced.
-    sink->flush ();
+    session->flush ();
 }
 
 void zmq::pgm_receiver_t::timer_event (int token)
@@ -261,7 +262,7 @@ void zmq::pgm_receiver_t::timer_event (int token)
 void zmq::pgm_receiver_t::drop_subscriptions ()
 {
     msg_t msg;
-    while (sink->read (&msg))
+    while (session->read (&msg))
         msg.close ();
 }
 
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index aa010dd..f66c592 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -52,7 +52,7 @@ namespace zmq
         int init (bool udp_encapsulation_, const char *network_);
 
         //  i_engine interface implementation.
-        void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);
+        void plug (class io_thread_t *io_thread_, class session_t *session_);
         void unplug ();
         void terminate ();
         void activate_in ();
@@ -105,7 +105,7 @@ namespace zmq
         options_t options;
 
         //  Associated session.
-        i_engine_sink *sink;
+        class session_t *session;
 
         //  Most recently used decoder.
         decoder_t *mru_decoder;
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 9b1e215..d2419ea 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -30,6 +30,7 @@
 
 #include "io_thread.hpp"
 #include "pgm_sender.hpp"
+#include "session.hpp"
 #include "err.hpp"
 #include "wire.hpp"
 #include "stdint.hpp"
@@ -61,7 +62,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
     return rc;
 }
 
-void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
+void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_t *session_)
 {
     //  Alocate 2 fds for PGM socket.
     int downlink_socket_fd = 0;
@@ -69,7 +70,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
     int rdata_notify_fd = 0;
     int pending_notify_fd = 0;
 
-    encoder.set_sink (sink_);
+    encoder.set_session (session_);
 
     //  Fill fds from PGM transport and add them to the poller.
     pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
@@ -94,9 +95,9 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
     //  subscribe for all the messages.
     msg_t msg;
     msg.init ();
-    bool ok = sink_->write (&msg);
+    bool ok = session_->write (&msg);
     zmq_assert (ok);
-    sink_->flush ();
+    session_->flush ();
 }
 
 void zmq::pgm_sender_t::unplug ()
@@ -115,7 +116,7 @@ void zmq::pgm_sender_t::unplug ()
     rm_fd (uplink_handle);
     rm_fd (rdata_notify_handle);
     rm_fd (pending_notify_handle);
-    encoder.set_sink (NULL);
+    encoder.set_session (NULL);
 }
 
 void zmq::pgm_sender_t::terminate ()
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index c29dd12..366e385 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -50,7 +50,7 @@ namespace zmq
         int init (bool udp_encapsulation_, const char *network_);
 
         //  i_engine interface implementation.
-        void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);
+        void plug (class io_thread_t *io_thread_, class session_t *session_);
         void unplug ();
         void terminate ();
         void activate_in ();
diff --git a/src/session.cpp b/src/session.cpp
index 0dd0e34..ed9b44e 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -24,11 +24,16 @@
 #include "err.hpp"
 #include "pipe.hpp"
 #include "likely.hpp"
+#include "zmq_connecter.hpp"
+#include "pgm_sender.hpp"
+#include "pgm_receiver.hpp"
 
-zmq::session_t::session_t (class io_thread_t *io_thread_,
-      class socket_base_t *socket_, const options_t &options_) :
+zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_,
+      class socket_base_t *socket_, const options_t &options_,
+      const char *protocol_, const char *address_) :
     own_t (io_thread_, options_),
     io_object_t (io_thread_),
+    connect (connect_),
     pipe (NULL),
     incomplete_in (false),
     pending (false),
@@ -37,6 +42,10 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
     io_thread (io_thread_),
     has_linger_timer (false)
 {
+    if (protocol_)
+        protocol = protocol_;
+    if (address_)
+        address = address_;
 }
 
 zmq::session_t::~session_t ()
@@ -157,6 +166,8 @@ void zmq::session_t::hiccuped (pipe_t *pipe_)
 
 void zmq::session_t::process_plug ()
 {
+    if (connect)
+        start_connecting (false);
 }
 
 void zmq::session_t::process_attach (i_engine *engine_)
@@ -169,12 +180,6 @@ void zmq::session_t::process_attach (i_engine *engine_)
         return;
     }
 
-    //  Trigger the notfication event about the attachment.
-    if (!attached ()) {
-        delete engine_;
-        return;
-    }
-
     //  Create the pipe if it does not exist yet.
     if (!pipe && !is_terminating ()) {
         object_t *parents [2] = {this, socket};
@@ -271,25 +276,87 @@ void zmq::session_t::timer_event (int id_)
     pipe->terminate (false);
 }
 
-bool zmq::session_t::attached ()
-{
-    return xattached ();
-}
-
 void zmq::session_t::detached ()
 {
-    if (!xdetached ()) {
-
-        //  Derived session type have asked for session termination.
+    //  Transient session self-destructs after peer disconnects.
+    if (!connect) {
         terminate ();
         return;
     }
 
+    //  Reconnect.
+    start_connecting (true);
+
     //  For subscriber sockets we hiccup the inbound pipe, which will cause
     //  the socket object to resend all the subscriptions.
     if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
         pipe->hiccup ();  
 }
 
+void zmq::session_t::start_connecting (bool wait_)
+{
+    zmq_assert (connect);
+
+    //  Choose I/O thread to run connecter in. Given that we are already
+    //  running in an I/O thread, there must be at least one available.
+    io_thread_t *io_thread = choose_io_thread (options.affinity);
+    zmq_assert (io_thread);
+
+    //  Create the connecter object.
+
+    //  Both TCP and IPC transports are using the same infrastructure.
+    if (protocol == "tcp" || protocol == "ipc") {
+
+        zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
+            io_thread, this, options, protocol.c_str (), address.c_str (),
+            wait_);
+        alloc_assert (connecter);
+        launch_child (connecter);
+        return;
+    }
+
+#if defined ZMQ_HAVE_OPENPGM
 
+    //  Both PGM and EPGM transports are using the same infrastructure.
+    if (protocol == "pgm" || protocol == "epgm") {
+
+        //  For EPGM transport with UDP encapsulation of PGM is used.
+        bool udp_encapsulation = (protocol == "epgm");
+
+        //  At this point we'll create message pipes to the session straight
+        //  away. There's no point in delaying it as no concept of 'connect'
+        //  exists with PGM anyway.
+        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
+
+            //  PGM sender.
+            pgm_sender_t *pgm_sender =  new (std::nothrow) pgm_sender_t (
+                io_thread, options);
+            alloc_assert (pgm_sender);
+
+            int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
+            zmq_assert (rc == 0);
+
+            send_attach (this, pgm_sender);
+        }
+        else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
+
+            //  PGM receiver.
+            pgm_receiver_t *pgm_receiver =  new (std::nothrow) pgm_receiver_t (
+                io_thread, options);
+            alloc_assert (pgm_receiver);
+
+            int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
+            zmq_assert (rc == 0);
+
+            send_attach (this, pgm_receiver);
+        }
+        else
+            zmq_assert (false);
+
+        return;
+    }
+#endif
+
+    zmq_assert (false);
+}
 
diff --git a/src/session.hpp b/src/session.hpp
index 60aa7c5..a155357 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -21,6 +21,8 @@
 #ifndef __ZMQ_SESSION_HPP_INCLUDED__
 #define __ZMQ_SESSION_HPP_INCLUDED__
 
+#include <string>
+
 #include "own.hpp"
 #include "i_engine.hpp"
 #include "io_object.hpp"
@@ -32,18 +34,18 @@ namespace zmq
     class session_t :
         public own_t,
         public io_object_t,
-        public i_engine_sink,
         public i_pipe_events
     {
     public:
 
-        session_t (class io_thread_t *io_thread_,
-            class socket_base_t *socket_, const options_t &options_);
+        session_t (class io_thread_t *io_thread_, bool connect_,
+            class socket_base_t *socket_, const options_t &options_,
+            const char *protocol_, const char *address_);
 
         //  To be used once only, when creating the session.
         void attach_pipe (class pipe_t *pipe_);
 
-        //  i_engine_sink interface implementation.
+        //  Following functions are the interface exposed towards the engine.
         bool read (msg_t *msg_);
         bool write (msg_t *msg_);
         void flush ();
@@ -55,22 +57,12 @@ namespace zmq
         void hiccuped (class pipe_t *pipe_);
         void terminated (class pipe_t *pipe_);
 
-    protected:
-
-        //  Events from the engine. Attached is triggered when session is
-        //  attached to a peer. The function can reject the new peer by
-        //  returning false. Detached is triggered at the beginning of
-        //  the termination process when session is about to be detached from
-        //  the peer. If it returns false, session will be terminated.
-        //  To be overloaded by the derived session type.
-        virtual bool xattached () = 0;
-        virtual bool xdetached () = 0;
+    private:
 
         ~session_t ();
 
-    private:
+        void start_connecting (bool wait_);
 
-        bool attached ();
         void detached ();
 
         //  Handlers for incoming commands.
@@ -88,6 +80,10 @@ namespace zmq
         //  Call this function to move on with the delayed process_term.
         void proceed_with_term ();
 
+        //  If true, this session (re)connects to the peer. Otherwise, it's
+        //  a transient session created by the listener.
+        bool connect;
+
         //  Pipe connecting the session to its socket.
         class pipe_t *pipe;
 
@@ -115,6 +111,10 @@ namespace zmq
         //  True is linger timer is running.
         bool has_linger_timer;
 
+        //  Protocol and address to use when connecting.
+        std::string protocol;
+        std::string address;
+
         session_t (const session_t&);
         const session_t &operator = (const session_t&);
     };
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 92715b1..2ec3998 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -37,7 +37,7 @@
 #include "zmq_listener.hpp"
 #include "zmq_connecter.hpp"
 #include "io_thread.hpp"
-#include "connect_session.hpp"
+#include "session.hpp"
 #include "config.hpp"
 #include "clock.hpp"
 #include "pipe.hpp"
@@ -441,8 +441,8 @@ int zmq::socket_base_t::connect (const char *addr_)
     }
 
     //  Create session.
-    connect_session_t *session = new (std::nothrow) connect_session_t (
-        io_thread, this, options, protocol.c_str (), address.c_str ());
+    session_t *session = new (std::nothrow) session_t (
+        io_thread, true, this, options, protocol.c_str (), address.c_str ());
     alloc_assert (session);
 
     //  Create a bi-directional pipe.
diff --git a/src/transient_session.cpp b/src/transient_session.cpp
deleted file mode 100644
index d389ff4..0000000
--- a/src/transient_session.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
-    Copyright (c) 2007-2011 iMatix Corporation
-    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "transient_session.hpp"
-
-zmq::transient_session_t::transient_session_t (class io_thread_t *io_thread_,
-      class socket_base_t *socket_, const options_t &options_) :
-    session_t (io_thread_, socket_, options_)
-{
-}
-
-zmq::transient_session_t::~transient_session_t ()
-{
-}
-
-bool zmq::transient_session_t::xattached ()
-{
-    //  Transient session is always valid.
-    return true;
-}
-
-bool zmq::transient_session_t::xdetached ()
-{
-    //  There's no way to reestablish a transient session. Tear it down.
-    return false;
-}
diff --git a/src/transient_session.hpp b/src/transient_session.hpp
deleted file mode 100644
index eff6b65..0000000
--- a/src/transient_session.hpp
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
-    Copyright (c) 2007-2011 iMatix Corporation
-    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of 0MQ.
-
-    0MQ is free software; you can redistribute it and/or modify it under
-    the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    0MQ is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_TRANSIENT_SESSION_HPP_INCLUDED__
-#define __ZMQ_TRANSIENT_SESSION_HPP_INCLUDED__
-
-#include "session.hpp"
-
-namespace zmq
-{
-
-    //  Transient session is created by the listener when the connected peer
-    //  stays anonymous. Transient session is destroyed on disconnect.
-
-    class transient_session_t : public session_t
-    {
-    public:
-
-        transient_session_t (class io_thread_t *io_thread_,
-            class socket_base_t *socket_, const options_t &options_);
-        ~transient_session_t ();
-
-    private:
-
-        //  Handlers for events from session base class.
-        bool xattached ();
-        bool xdetached ();
-
-        transient_session_t (const transient_session_t&);
-        const transient_session_t &operator = (const transient_session_t&);
-    };
-
-}
-
-#endif
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index b0a7df1..fa1bd45 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -29,6 +29,7 @@
 #include "zmq_engine.hpp"
 #include "zmq_connecter.hpp"
 #include "io_thread.hpp"
+#include "session.hpp"
 #include "config.hpp"
 #include "err.hpp"
 
@@ -39,8 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
     outpos (NULL),
     outsize (0),
     encoder (out_batch_size),
-    sink (NULL),
-    ephemeral_sink (NULL),
+    session (NULL),
+    leftover_session (NULL),
     options (options_),
     plugged (false)
 {
@@ -54,18 +55,18 @@ zmq::zmq_engine_t::~zmq_engine_t ()
     zmq_assert (!plugged);
 }
 
-void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
+void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, session_t *session_)
 {
     zmq_assert (!plugged);
     plugged = true;
-    ephemeral_sink = NULL;
+    leftover_session = NULL;
 
-    //  Connect to session/init object.
-    zmq_assert (!sink);
-    zmq_assert (sink_);
-    encoder.set_sink (sink_);
-    decoder.set_sink (sink_);
-    sink = sink_;
+    //  Connect to session object.
+    zmq_assert (!session);
+    zmq_assert (session_);
+    encoder.set_session (session_);
+    decoder.set_session (session_);
+    session = session_;
 
     //  Connect to I/O threads poller object.
     io_object_t::plug (io_thread_);
@@ -88,11 +89,11 @@ void zmq::zmq_engine_t::unplug ()
     //  Disconnect from I/O threads poller object.
     io_object_t::unplug ();
 
-    //  Disconnect from init/session object.
-    encoder.set_sink (NULL);
-    decoder.set_sink (NULL);
-    ephemeral_sink = sink;
-    sink = NULL;
+    //  Disconnect from session object.
+    encoder.set_session (NULL);
+    decoder.set_session (NULL);
+    leftover_session = session;
+    session = NULL;
 }
 
 void zmq::zmq_engine_t::terminate ()
@@ -133,9 +134,7 @@ void zmq::zmq_engine_t::in_event ()
         //  Stop polling for input if we got stuck.
         if (processed < insize) {
 
-            //  This may happen if queue limits are in effect or when
-            //  init object reads all required information from the socket
-            //  and rejects to read more data.
+            //  This may happen if queue limits are in effect.
             if (plugged)
                 reset_pollin (handle);
         }
@@ -148,13 +147,13 @@ void zmq::zmq_engine_t::in_event ()
     //  Flush all messages the decoder may have produced.
     //  If IO handler has unplugged engine, flush transient IO handler.
     if (unlikely (!plugged)) {
-        zmq_assert (ephemeral_sink);
-        ephemeral_sink->flush ();
+        zmq_assert (leftover_session);
+        leftover_session->flush ();
     } else {
-        sink->flush ();
+        session->flush ();
     }
 
-    if (sink && disconnection)
+    if (session && disconnection)
         error ();
 }
 
@@ -168,8 +167,8 @@ void zmq::zmq_engine_t::out_event ()
 
         //  If IO handler has unplugged engine, flush transient IO handler.
         if (unlikely (!plugged)) {
-            zmq_assert (ephemeral_sink);
-            ephemeral_sink->flush ();
+            zmq_assert (leftover_session);
+            leftover_session->flush ();
             return;
         }
 
@@ -218,8 +217,8 @@ void zmq::zmq_engine_t::activate_in ()
 
 void zmq::zmq_engine_t::error ()
 {
-    zmq_assert (sink);
-    sink->detach ();
+    zmq_assert (session);
+    session->detach ();
     unplug ();
     delete this;
 }
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 7f09775..ad4bc8a 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -43,7 +43,7 @@ namespace zmq
         ~zmq_engine_t ();
 
         //  i_engine interface implementation.
-        void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);
+        void plug (class io_thread_t *io_thread_, class session_t *session_);
         void unplug ();
         void terminate ();
         void activate_in ();
@@ -69,10 +69,11 @@ namespace zmq
         size_t outsize;
         encoder_t encoder;
 
-        i_engine_sink *sink;
+        //  The session this engine is attached to.
+        class session_t *session;
 
-        //  Detached transient sink.
-        i_engine_sink *ephemeral_sink;
+        //  Detached transient session.
+        class session_t *leftover_session;
 
         options_t options;
 
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 2d5e7bb..cb73ad8 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -21,9 +21,9 @@
 #include <new>
 
 #include "zmq_listener.hpp"
-#include "transient_session.hpp"
 #include "zmq_engine.hpp"
 #include "io_thread.hpp"
+#include "session.hpp"
 #include "err.hpp"
 
 zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_,
@@ -74,8 +74,8 @@ void zmq::zmq_listener_t::in_event ()
     zmq_assert (io_thread);
 
     //  Create and launch a session object. 
-    transient_session_t *session = new (std::nothrow)
-        transient_session_t (io_thread, socket, options);
+    session_t *session = new (std::nothrow)
+        session_t (io_thread, false, socket, options, NULL, NULL);
     alloc_assert (session);
     session->inc_seqnum ();
     launch_child (session);
-- 
cgit v1.2.3