From acf0b0e515515e51ad32ba7a2d147ce703579478 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <sustrik@250bpm.com>
Date: Sun, 22 May 2011 17:26:53 +0200
Subject: Introduces bi-directional pipes

So far, there was a pair of unidirectional pipes between a socket
and a session (or an inproc peer). This resulted in complex
problems with half-closed states and tracking which inpipe
corresponds to which outpipe.

This patch doesn't add any functionality in itself, but is
essential for further work on features like subscription
forwarding.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
---
 src/array.hpp           |  79 +++++++++---
 src/command.hpp         |  11 +-
 src/connect_session.cpp |   1 +
 src/dist.cpp            |  10 +-
 src/dist.hpp            |  13 +-
 src/fq.cpp              |  12 +-
 src/fq.hpp              |  12 +-
 src/lb.cpp              |   8 +-
 src/lb.hpp              |  14 +-
 src/object.cpp          |  40 +++---
 src/object.hpp          |  19 ++-
 src/options.cpp         |   2 -
 src/options.hpp         |   5 -
 src/own.cpp             |   3 +
 src/pair.cpp            |  98 ++++----------
 src/pair.hpp            |  24 +---
 src/pipe.cpp            | 335 +++++++++++++++++++++++-------------------------
 src/pipe.hpp            | 201 ++++++++++++-----------------
 src/pull.cpp            |  26 +++-
 src/pull.hpp            |  13 +-
 src/push.cpp            |  26 +++-
 src/push.hpp            |  13 +-
 src/session.cpp         | 167 ++++++++++--------------
 src/session.hpp         |  29 ++---
 src/socket_base.cpp     |  73 +++++------
 src/socket_base.hpp     |  12 +-
 src/xpub.cpp            |  26 +++-
 src/xpub.hpp            |  12 +-
 src/xrep.cpp            |  90 +++++--------
 src/xrep.hpp            |  24 ++--
 src/xreq.cpp            |  28 +++-
 src/xreq.hpp            |  13 +-
 src/xsub.cpp            |  26 +++-
 src/xsub.hpp            |  13 +-
 34 files changed, 709 insertions(+), 769 deletions(-)

diff --git a/src/array.hpp b/src/array.hpp
index 1d18e48..e7b5266 100644
--- a/src/array.hpp
+++ b/src/array.hpp
@@ -28,14 +28,17 @@ namespace zmq
 {
 
     //  Base class for objects stored in the array. Note that each object can
-    //  be stored in at most one array.
+    //  be stored in at most two arrays. This is needed specifically in the
+    //  case where single pipe object is stored both in array of inbound pipes
+    //  and in the array of outbound pipes.
 
     class array_item_t
     {
     public:
 
         inline array_item_t () :
-            array_index (-1)
+            array_index1 (-1),
+            array_index2 (-1)
         {
         }
 
@@ -45,19 +48,30 @@ namespace zmq
         {
         }
 
-        inline void set_array_index (int index_)
+        inline void set_array_index1 (int index_)
         {
-            array_index = index_;
+            array_index1 = index_;
         }
 
-        inline int get_array_index ()
+        inline int get_array_index1 ()
         {
-            return array_index;
+            return array_index1;
+        }
+
+        inline void set_array_index2 (int index_)
+        {
+            array_index2 = index_;
+        }
+
+        inline int get_array_index2 ()
+        {
+            return array_index2;
         }
 
     private:
 
-        int array_index;
+        int array_index1;
+        int array_index2;
 
         array_item_t (const array_item_t&);
         const array_item_t &operator = (const array_item_t&);
@@ -65,9 +79,11 @@ namespace zmq
 
     //  Fast array implementation with O(1) access to item, insertion and
     //  removal. Array stores pointers rather than objects. The objects have
-    //  to be derived from array_item_t class.
+    //  to be derived from array_item_t class, thus they can be stored in
+    //  two arrays. Template parameter N specifies which index in array_item_t
+    //  to use.
 
-    template <typename T> class array_t
+    template <typename T, int N = 1> class array_t
     {
     public:
 
@@ -98,28 +114,48 @@ namespace zmq
 
         inline void push_back (T *item_)
         {
-            if (item_)
-                item_->set_array_index ((int) items.size ());
+            if (item_) {
+                if (N == 1)
+                    item_->set_array_index1 ((int) items.size ());
+                else
+                    item_->set_array_index2 ((int) items.size ());
+            }
             items.push_back (item_);
         }
 
-        inline void erase (T *item_) {
-            erase (item_->get_array_index ());
+        inline void erase (T *item_)
+        {
+            if (N == 1)
+                erase (item_->get_array_index1 ());
+            else
+                erase (item_->get_array_index2 ());
         }
 
         inline void erase (size_type index_) {
-            if (items.back ())
-                items.back ()->set_array_index ((int) index_);
+            if (items.back ()) {
+                if (N == 1)
+                    items.back ()->set_array_index1 ((int) index_);
+                else
+                    items.back ()->set_array_index2 ((int) index_);
+            }
             items [index_] = items.back ();
             items.pop_back ();
         }
 
         inline void swap (size_type index1_, size_type index2_)
         {
-            if (items [index1_])
-                items [index1_]->set_array_index ((int) index2_);
-            if (items [index2_])
-                items [index2_]->set_array_index ((int) index1_);
+            if (N == 1) {
+		        if (items [index1_])
+		            items [index1_]->set_array_index1 ((int) index2_);
+		        if (items [index2_])
+		            items [index2_]->set_array_index1 ((int) index1_);
+            }
+            else {
+		        if (items [index1_])
+		            items [index1_]->set_array_index2 ((int) index2_);
+		        if (items [index2_])
+		            items [index2_]->set_array_index2 ((int) index1_);
+            }
             std::swap (items [index1_], items [index2_]);
         }
 
@@ -130,7 +166,10 @@ namespace zmq
 
         inline size_type index (T *item_)
         {
-            return (size_type) item_->get_array_index ();
+            if (N == 1)
+                return (size_type) item_->get_array_index1 ();
+            else
+                return (size_type) item_->get_array_index2 ();
         }
 
     private:
diff --git a/src/command.hpp b/src/command.hpp
index 35aed0f..ff7b551 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -40,8 +40,8 @@ namespace zmq
             own,
             attach,
             bind,
-            activate_reader,
-            activate_writer,
+            activate_read,
+            activate_write,
             pipe_term,
             pipe_term_ack,
             term_req,
@@ -79,8 +79,7 @@ namespace zmq
             //  Sent from session to socket to establish pipe(s) between them.
             //  Caller have used inc_seqnum beforehand sending the command.
             struct {
-                class reader_t *in_pipe;
-                class writer_t *out_pipe;
+                class pipe_t *pipe;
                 unsigned char peer_identity_size;
                 unsigned char *peer_identity;
             } bind;
@@ -88,13 +87,13 @@ namespace zmq
             //  Sent by pipe writer to inform dormant pipe reader that there
             //  are messages in the pipe.
             struct {
-            } activate_reader;
+            } activate_read;
 
             //  Sent by pipe reader to inform pipe writer about how many
             //  messages it has read so far.
             struct {
                 uint64_t msgs_read;
-            } activate_writer;
+            } activate_write;
 
             //  Sent by pipe reader to pipe writer to ask it to terminate
             //  its end of the pipe.
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
index c0951cb..9a29bf1 100644
--- a/src/connect_session.cpp
+++ b/src/connect_session.cpp
@@ -22,6 +22,7 @@
 #include "zmq_connecter.hpp"
 #include "pgm_sender.hpp"
 #include "pgm_receiver.hpp"
+#include "err.hpp"
 
 zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
       class socket_base_t *socket_, const options_t &options_,
diff --git a/src/dist.cpp b/src/dist.cpp
index 7c15bfd..3afa196 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -39,10 +39,8 @@ zmq::dist_t::~dist_t ()
     zmq_assert (pipes.empty ());
 }
 
-void zmq::dist_t::attach (writer_t *pipe_)
+void zmq::dist_t::attach (pipe_t *pipe_)
 {
-    pipe_->set_event_sink (this);
-
     //  If we are in the middle of sending a message, we'll add new pipe
     //  into the list of eligible pipes. Otherwise we add it to the list
     //  of active pipes.
@@ -74,7 +72,7 @@ void zmq::dist_t::terminate ()
         pipes [i]->terminate ();
 }
 
-void zmq::dist_t::terminated (writer_t *pipe_)
+void zmq::dist_t::terminated (pipe_t *pipe_)
 {
     //  Remove the pipe from the list; adjust number of active and/or
     //  eligible pipes accordingly.
@@ -88,7 +86,7 @@ void zmq::dist_t::terminated (writer_t *pipe_)
         sink->unregister_term_ack ();
 }
 
-void zmq::dist_t::activated (writer_t *pipe_)
+void zmq::dist_t::activated (pipe_t *pipe_)
 {
     //  Move the pipe from passive to eligible state.
     pipes.swap (pipes.index (pipe_), eligible);
@@ -153,7 +151,7 @@ bool zmq::dist_t::has_out ()
     return true;
 }
 
-bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_)
+bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
 {
     if (!pipe_->write (msg_)) {
         pipes.swap (pipes.index (pipe_), active - 1);
diff --git a/src/dist.hpp b/src/dist.hpp
index fd522b9..c137332 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -31,33 +31,32 @@ namespace zmq
 
     //  Class manages a set of outbound pipes. It sends each messages to
     //  each of them.
-    class dist_t : public i_writer_events
+    class dist_t
     {
     public:
 
         dist_t (class own_t *sink_);
         ~dist_t ();
 
-        void attach (writer_t *pipe_);
+        void attach (class pipe_t *pipe_);
         void terminate ();
         int send (class msg_t *msg_, int flags_);
         bool has_out ();
 
-        //  i_writer_events interface implementation.
-        void activated (writer_t *pipe_);
-        void terminated (writer_t *pipe_);
+        void activated (class pipe_t *pipe_);
+        void terminated (class pipe_t *pipe_);
 
     private:
 
         //  Write the message to the pipe. Make the pipe inactive if writing
         //  fails. In such a case false is returned.
-        bool write (class writer_t *pipe_, class msg_t *msg_);
+        bool write (class pipe_t *pipe_, class msg_t *msg_);
 
         //  Put the message to all active pipes.
         void distribute (class msg_t *msg_, int flags_);
 
         //  List of outbound pipes.
-        typedef array_t <class writer_t> pipes_t;
+        typedef array_t <class pipe_t, 2> pipes_t;
         pipes_t pipes;
 
         //  Number of active pipes. All the active pipes are located at the
diff --git a/src/fq.cpp b/src/fq.cpp
index 392e554..b4ee641 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -38,10 +38,8 @@ zmq::fq_t::~fq_t ()
     zmq_assert (pipes.empty ());
 }
 
-void zmq::fq_t::attach (reader_t *pipe_)
+void zmq::fq_t::attach (pipe_t *pipe_)
 {
-    pipe_->set_event_sink (this);
-
     pipes.push_back (pipe_);
     pipes.swap (active, pipes.size () - 1);
     active++;
@@ -53,7 +51,7 @@ void zmq::fq_t::attach (reader_t *pipe_)
     }
 }
 
-void zmq::fq_t::terminated (reader_t *pipe_)
+void zmq::fq_t::terminated (pipe_t *pipe_)
 {
     //  Make sure that we are not closing current pipe while
     //  message is half-read.
@@ -72,10 +70,6 @@ void zmq::fq_t::terminated (reader_t *pipe_)
         sink->unregister_term_ack ();
 }
 
-void zmq::fq_t::delimited (reader_t *pipe_)
-{
-}
-
 void zmq::fq_t::terminate ()
 {
     zmq_assert (!terminating);
@@ -86,7 +80,7 @@ void zmq::fq_t::terminate ()
         pipes [i]->terminate ();
 }
 
-void zmq::fq_t::activated (reader_t *pipe_)
+void zmq::fq_t::activated (pipe_t *pipe_)
 {
     //  Move the pipe to the list of active pipes.
     pipes.swap (pipes.index (pipe_), active);
diff --git a/src/fq.hpp b/src/fq.hpp
index c35d458..bbe1b59 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -31,28 +31,26 @@ namespace zmq
     //  Class manages a set of inbound pipes. On receive it performs fair
     //  queueing (RFC970) so that senders gone berserk won't cause denial of
     //  service for decent senders.
-    class fq_t : public i_reader_events
+    class fq_t
     {
     public:
 
         fq_t (class own_t *sink_);
         ~fq_t ();
 
-        void attach (reader_t *pipe_);
+        void attach (pipe_t *pipe_);
         void terminate ();
 
         int recv (msg_t *msg_, int flags_);
         bool has_in ();
 
-        //  i_reader_events implementation.
-        void activated (reader_t *pipe_);
-        void terminated (reader_t *pipe_);
-        void delimited (reader_t *pipe_);
+        void activated (pipe_t *pipe_);
+        void terminated (pipe_t *pipe_);
 
     private:
 
         //  Inbound pipes.
-        typedef array_t <reader_t> pipes_t;
+        typedef array_t <pipe_t, 1> pipes_t;
         pipes_t pipes;
 
         //  Number of active pipes. All the active pipes are located at the
diff --git a/src/lb.cpp b/src/lb.cpp
index 8eb9157..2ba902a 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -39,10 +39,8 @@ zmq::lb_t::~lb_t ()
     zmq_assert (pipes.empty ());
 }
 
-void zmq::lb_t::attach (writer_t *pipe_)
+void zmq::lb_t::attach (pipe_t *pipe_)
 {
-    pipe_->set_event_sink (this);
-
     pipes.push_back (pipe_);
     pipes.swap (active, pipes.size () - 1);
     active++;
@@ -63,7 +61,7 @@ void zmq::lb_t::terminate ()
         pipes [i]->terminate ();
 }
 
-void zmq::lb_t::terminated (writer_t *pipe_)
+void zmq::lb_t::terminated (pipe_t *pipe_)
 {
     pipes_t::size_type index = pipes.index (pipe_);
 
@@ -85,7 +83,7 @@ void zmq::lb_t::terminated (writer_t *pipe_)
         sink->unregister_term_ack ();
 }
 
-void zmq::lb_t::activated (writer_t *pipe_)
+void zmq::lb_t::activated (pipe_t *pipe_)
 {
     //  Move the pipe to the list of active pipes.
     pipes.swap (pipes.index (pipe_), active);
diff --git a/src/lb.hpp b/src/lb.hpp
index f844b01..d764f6d 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -27,28 +27,28 @@
 namespace zmq
 {
 
-    //  Class manages a set of outbound pipes. On send it load balances
+    //  This class manages a set of outbound pipes. On send it load balances
     //  messages fairly among the pipes.
-    class lb_t : public i_writer_events
+
+    class lb_t
     {
     public:
 
         lb_t (class own_t *sink_);
         ~lb_t ();
 
-        void attach (writer_t *pipe_);
+        void attach (pipe_t *pipe_);
         void terminate ();
         int send (msg_t *msg_, int flags_);
         bool has_out ();
 
-        //  i_writer_events interface implementation.
-        void activated (writer_t *pipe_);
-        void terminated (writer_t *pipe_);
+        void activated (pipe_t *pipe_);
+        void terminated (pipe_t *pipe_);
 
     private:
 
         //  List of outbound pipes.
-        typedef array_t <class writer_t> pipes_t;
+        typedef array_t <class pipe_t, 2> pipes_t;
         pipes_t pipes;
 
         //  Number of active pipes. All the active pipes are located at the
diff --git a/src/object.cpp b/src/object.cpp
index e2ca6d6..0a06d5f 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -59,12 +59,12 @@ void zmq::object_t::process_command (command_t &cmd_)
 {
     switch (cmd_.type) {
 
-    case command_t::activate_reader:
-        process_activate_reader ();
+    case command_t::activate_read:
+        process_activate_read ();
         break;
 
-    case command_t::activate_writer:
-        process_activate_writer (cmd_.args.activate_writer.msgs_read);
+    case command_t::activate_write:
+        process_activate_write (cmd_.args.activate_write.msgs_read);
         break;
 
     case command_t::stop:
@@ -90,8 +90,8 @@ void zmq::object_t::process_command (command_t &cmd_)
         break;
 
     case command_t::bind:
-        process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
-            cmd_.args.bind.peer_identity ? blob_t (cmd_.args.bind.peer_identity,
+        process_bind (cmd_.args.bind.pipe, cmd_.args.bind.peer_identity ?
+            blob_t (cmd_.args.bind.peer_identity,
             cmd_.args.bind.peer_identity_size) : blob_t ());
         process_seqnum ();
         break;
@@ -236,8 +236,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
     send_command (cmd);
 }
 
-void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,
-    writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_)
+void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
+    const blob_t &peer_identity_, bool inc_seqnum_)
 {
     if (inc_seqnum_)
         destination_->inc_seqnum ();
@@ -248,8 +248,7 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,
 #endif
     cmd.destination = destination_;
     cmd.type = command_t::bind;
-    cmd.args.bind.in_pipe = in_pipe_;
-    cmd.args.bind.out_pipe = out_pipe_;
+    cmd.args.bind.pipe = pipe_;
     if (peer_identity_.empty ()) {
         cmd.args.bind.peer_identity_size = 0;
         cmd.args.bind.peer_identity = NULL;
@@ -267,18 +266,18 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,
     send_command (cmd);
 }
 
-void zmq::object_t::send_activate_reader (reader_t *destination_)
+void zmq::object_t::send_activate_read (pipe_t *destination_)
 {
     command_t cmd;
 #if defined ZMQ_MAKE_VALGRIND_HAPPY
     memset (&cmd, 0, sizeof (cmd));
 #endif
     cmd.destination = destination_;
-    cmd.type = command_t::activate_reader;
+    cmd.type = command_t::activate_read;
     send_command (cmd);
 }
 
-void zmq::object_t::send_activate_writer (writer_t *destination_,
+void zmq::object_t::send_activate_write (pipe_t *destination_,
     uint64_t msgs_read_)
 {
     command_t cmd;
@@ -286,12 +285,12 @@ void zmq::object_t::send_activate_writer (writer_t *destination_,
     memset (&cmd, 0, sizeof (cmd));
 #endif
     cmd.destination = destination_;
-    cmd.type = command_t::activate_writer;
-    cmd.args.activate_writer.msgs_read = msgs_read_;
+    cmd.type = command_t::activate_write;
+    cmd.args.activate_write.msgs_read = msgs_read_;
     send_command (cmd);
 }
 
-void zmq::object_t::send_pipe_term (writer_t *destination_)
+void zmq::object_t::send_pipe_term (pipe_t *destination_)
 {
     command_t cmd;
 #if defined ZMQ_MAKE_VALGRIND_HAPPY
@@ -302,7 +301,7 @@ void zmq::object_t::send_pipe_term (writer_t *destination_)
     send_command (cmd);
 }
 
-void zmq::object_t::send_pipe_term_ack (reader_t *destination_)
+void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
 {
     command_t cmd;
 #if defined ZMQ_MAKE_VALGRIND_HAPPY
@@ -404,18 +403,17 @@ void zmq::object_t::process_attach (i_engine *engine_,
     zmq_assert (false);
 }
 
-void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
-    const blob_t &peer_identity_)
+void zmq::object_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_)
 {
     zmq_assert (false);
 }
 
-void zmq::object_t::process_activate_reader ()
+void zmq::object_t::process_activate_read ()
 {
     zmq_assert (false);
 }
 
-void zmq::object_t::process_activate_writer (uint64_t msgs_read_)
+void zmq::object_t::process_activate_write (uint64_t msgs_read_)
 {
     zmq_assert (false);
 }
diff --git a/src/object.hpp b/src/object.hpp
index 0f5e61b..0f47670 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -66,14 +66,13 @@ namespace zmq
         void send_attach (class session_t *destination_,
              struct i_engine *engine_, const blob_t &peer_identity_,
              bool inc_seqnum_ = true);
-        void send_bind (class own_t *destination_,
-             class reader_t *in_pipe_, class writer_t *out_pipe_,
+        void send_bind (class own_t *destination_, class pipe_t *pipe_,
              const blob_t &peer_identity_, bool inc_seqnum_ = true);
-        void send_activate_reader (class reader_t *destination_);
-        void send_activate_writer (class writer_t *destination_,
+        void send_activate_read (class pipe_t *destination_);
+        void send_activate_write (class pipe_t *destination_,
              uint64_t msgs_read_);
-        void send_pipe_term (class writer_t *destination_);
-        void send_pipe_term_ack (class reader_t *destination_);
+        void send_pipe_term (class pipe_t *destination_);
+        void send_pipe_term_ack (class pipe_t *destination_);
         void send_term_req (class own_t *destination_,
             class own_t *object_);
         void send_term (class own_t *destination_, int linger_);
@@ -89,10 +88,10 @@ namespace zmq
         virtual void process_own (class own_t *object_);
         virtual void process_attach (struct i_engine *engine_,
             const blob_t &peer_identity_);
-        virtual void process_bind (class reader_t *in_pipe_,
-            class writer_t *out_pipe_, const blob_t &peer_identity_);
-        virtual void process_activate_reader ();
-        virtual void process_activate_writer (uint64_t msgs_read_);
+        virtual void process_bind (class pipe_t *pipe_,
+            const blob_t &peer_identity_);
+        virtual void process_activate_read ();
+        virtual void process_activate_write (uint64_t msgs_read_);
         virtual void process_pipe_term ();
         virtual void process_pipe_term_ack ();
         virtual void process_term_req (class own_t *object_);
diff --git a/src/options.cpp b/src/options.cpp
index 897e0f5..271ebdb 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -38,8 +38,6 @@ zmq::options_t::options_t () :
     reconnect_ivl_max (0),
     backlog (100),
     maxmsgsize (-1),
-    requires_in (false),
-    requires_out (false),
     immediate_connect (true)
 {
 }
diff --git a/src/options.hpp b/src/options.hpp
index 53d0197..fd39a74 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -75,11 +75,6 @@ namespace zmq
         //  Maximal size of message to handle.
         int64_t maxmsgsize;
 
-        //  These options are never set by the user directly. Instead they are
-        //  provided by the specific socket type.
-        bool requires_in;
-        bool requires_out;
-
         //  If true, when connecting, pipes are created immediately without
         //  waiting for the connection to be established. That way the socket
         //  is not aware of the peer's identity, however, it is able to send
diff --git a/src/own.cpp b/src/own.cpp
index 4cbfdd6..cdf20a4 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -173,6 +173,7 @@ void zmq::own_t::process_term (int linger_)
 void zmq::own_t::register_term_acks (int count_)
 {
     term_acks += count_;
+    printf ("reg %d acks (%p, %d)\n", count_, (void*) this, term_acks);
 }
 
 void zmq::own_t::unregister_term_ack ()
@@ -180,6 +181,8 @@ void zmq::own_t::unregister_term_ack ()
     zmq_assert (term_acks > 0);
     term_acks--;
 
+    printf ("unreg 1 acks (%p, %d)\n", (void*) this, term_acks);
+
     //  This may be a last ack we are waiting for before termination...
     check_term_acks (); 
 }
diff --git a/src/pair.cpp b/src/pair.cpp
index d877b54..93a4327 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -25,111 +25,72 @@
 
 zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
     socket_base_t (parent_, tid_),
-    inpipe (NULL),
-    outpipe (NULL),
-    inpipe_alive (false),
-    outpipe_alive (false),
+    pipe (NULL),
     terminating (false)
 {
     options.type = ZMQ_PAIR;
-    options.requires_in = true;
-    options.requires_out = true;
 }
 
 zmq::pair_t::~pair_t ()
 {
-    zmq_assert (!inpipe);
-    zmq_assert (!outpipe);
+    zmq_assert (!pipe);
 }
 
-void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
-    const blob_t &peer_identity_)
+void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
 {
-    zmq_assert (!inpipe && !outpipe);
+    zmq_assert (!pipe);
 
-    inpipe = inpipe_;
-    inpipe_alive = true;
-    inpipe->set_event_sink (this);
-
-    outpipe = outpipe_;
-    outpipe_alive = true;
-    outpipe->set_event_sink (this);
+    pipe = pipe_;
+    pipe->set_event_sink (this);
 
     if (terminating) {
-        register_term_acks (2);
-        inpipe_->terminate ();
-        outpipe_->terminate ();
+        register_term_acks (1);
+        pipe_->terminate ();
     }
 }
 
-void zmq::pair_t::terminated (reader_t *pipe_)
-{
-    zmq_assert (pipe_ == inpipe);
-    inpipe = NULL;
-    inpipe_alive = false;
-
-    if (terminating)
-        unregister_term_ack ();
-}
-
-void zmq::pair_t::terminated (writer_t *pipe_)
+void zmq::pair_t::terminated (pipe_t *pipe_)
 {
-    zmq_assert (pipe_ == outpipe);
-    outpipe = NULL;
-    outpipe_alive = false;
+    zmq_assert (pipe_ == pipe);
+    pipe = NULL;
 
     if (terminating)
         unregister_term_ack ();
 }
 
-void  zmq::pair_t::delimited (reader_t *pipe_)
-{
-}
-
 void zmq::pair_t::process_term (int linger_)
 {
     terminating = true;
 
-    if (inpipe) {
+    if (pipe) {
         register_term_acks (1);
-        inpipe->terminate ();
-    }
-
-    if (outpipe) {
-        register_term_acks (1);
-        outpipe->terminate ();
+        pipe->terminate ();
     }
 
     socket_base_t::process_term (linger_);
 }
 
-void zmq::pair_t::activated (class reader_t *pipe_)
+void zmq::pair_t::read_activated (pipe_t *pipe_)
 {
-    zmq_assert (!inpipe_alive);
-    inpipe_alive = true;
+    //  There's just one pipe. No lists of active and inactive pipes.
+    //  There's nothing to do here.
 }
 
-void zmq::pair_t::activated (class writer_t *pipe_)
+void zmq::pair_t::write_activated (pipe_t *pipe_)
 {
-    zmq_assert (!outpipe_alive);
-    outpipe_alive = true;
+    //  There's just one pipe. No lists of active and inactive pipes.
+    //  There's nothing to do here.
 }
 
 int zmq::pair_t::xsend (msg_t *msg_, int flags_)
 {
-    if (outpipe == NULL || !outpipe_alive) {
-        errno = EAGAIN;
-        return -1;
-    }
-
-    if (!outpipe->write (msg_)) {
-        outpipe_alive = false;
+    if (!pipe || !pipe->write (msg_)) {
         errno = EAGAIN;
         return -1;
     }
 
     if (!(flags_ & ZMQ_SNDMORE))
-        outpipe->flush ();
+        pipe->flush ();
 
     //  Detach the original message from the data buffer.
     int rc = msg_->init ();
@@ -144,14 +105,12 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_)
     int rc = msg_->close ();
     errno_assert (rc == 0);
 
-    if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) {
-
-        //  No message is available.
-        inpipe_alive = false;
+    if (!pipe || !pipe->read (msg_)) {
 
         //  Initialise the output parameter to be a 0-byte message.
         rc = msg_->init ();
         errno_assert (rc == 0);
+
         errno = EAGAIN;
         return -1;
     }
@@ -160,24 +119,23 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_)
 
 bool zmq::pair_t::xhas_in ()
 {
-    if (!inpipe || !inpipe_alive)
+    if (!pipe)
         return false;
 
-    inpipe_alive = inpipe->check_read ();
-    return inpipe_alive;
+    return pipe->check_read ();
 }
 
 bool zmq::pair_t::xhas_out ()
 {
-    if (!outpipe || !outpipe_alive)
+    if (!pipe)
         return false;
 
     msg_t msg;
     int rc = msg.init ();
     errno_assert (rc == 0);
-    outpipe_alive = outpipe->check_write (&msg);
+    bool result = pipe->check_write (&msg);
     rc = msg.close ();
     errno_assert (rc == 0);
-    return outpipe_alive;
+    return result;
 }
 
diff --git a/src/pair.hpp b/src/pair.hpp
index a10e15a..2cb050a 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -29,8 +29,7 @@ namespace zmq
 
     class pair_t :
         public socket_base_t,
-        public i_reader_events,
-        public i_writer_events
+        public i_pipe_events
     {
     public:
 
@@ -38,32 +37,23 @@ namespace zmq
         ~pair_t ();
 
         //  Overloads of functions from socket_base_t.
-        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
-            const blob_t &peer_identity_);
+        void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
         int xsend (class msg_t *msg_, int flags_);
         int xrecv (class msg_t *msg_, int flags_);
         bool xhas_in ();
         bool xhas_out ();
 
-        //  i_reader_events interface implementation.
-        void activated (class reader_t *pipe_);
-        void terminated (class reader_t *pipe_);
-        void delimited (class reader_t *pipe_);
-
-        //  i_writer_events interface implementation.
-        void activated (class writer_t *pipe_);
-        void terminated (class writer_t *pipe_);
+        //  i_pipe_events interface implementation.
+        void read_activated (class pipe_t *pipe_);
+        void write_activated (class pipe_t *pipe_);
+        void terminated (class pipe_t *pipe_);
 
     private:
 
         //  Hook into termination process.
         void process_term (int linger_);
 
-        class reader_t *inpipe;
-        class writer_t *outpipe;
-
-        bool inpipe_alive;
-        bool outpipe_alive;
+        class pipe_t *pipe;
 
         bool terminating;
 
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 36dc808..fb03042 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -19,100 +19,123 @@
 */
 
 #include <new>
+#include <stddef.h>
 
 #include "pipe.hpp"
-#include "likely.hpp"
+#include "err.hpp"
 
-zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, int lwm_) :
+int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
+    int hwms_ [2], bool delays_ [2])
+{
+    //   Creates two pipe objects. These objects are connected by two ypipes,
+    //   each to pass messages in one direction.
+
+    pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t ();
+    alloc_assert (upipe1);
+    pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t ();
+    alloc_assert (upipe2);
+
+    pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
+        hwms_ [1], hwms_ [0], delays_ [0]);
+    alloc_assert (pipes_ [0]);
+    pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
+        hwms_ [0], hwms_ [1], delays_ [1]);
+    alloc_assert (pipes_ [1]);
+
+    pipes_ [0]->set_peer (pipes_ [1]);
+    pipes_ [1]->set_peer (pipes_ [0]);
+
+    return 0;
+}
+
+zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
+      int inhwm_, int outhwm_, bool delay_) :
     object_t (parent_),
-    active (true),
-    pipe (pipe_),
-    writer (NULL),
-    lwm (lwm_),
+    inpipe (inpipe_),
+    outpipe (outpipe_),
+    in_active (true),
+    out_active (true),
+    hwm (outhwm_),
+    lwm (compute_lwm (inhwm_)),
     msgs_read (0),
+    msgs_written (0),
+    peers_msgs_read (0),
+    peer (NULL),
     sink (NULL),
-    terminating (false)
+    terminating (false),
+    term_recvd (false),
+    delimited (false),
+    delay (delay_)
 {
-    //  Note that writer is not set here. Writer will inform reader about its
-    //  address once it is created (via set_writer method).
 }
 
-void zmq::reader_t::set_writer (writer_t *writer_)
+zmq::pipe_t::~pipe_t ()
 {
-    zmq_assert (!writer);
-    writer = writer_;
 }
 
-zmq::reader_t::~reader_t ()
+void zmq::pipe_t::set_peer (pipe_t *peer_)
 {
-    //  Pipe as such is owned and deallocated by reader object.
-    //  The point is that reader processes the last step of terminal
-    //  handshaking (term_ack).
-    zmq_assert (pipe);
-
-    //  First delete all the unread messages in the pipe. We have to do it by
-    //  hand because msg_t doesn't have automatic destructor.
-    msg_t msg;
-    while (pipe->read (&msg)) {
-       int rc = msg.close ();
-       errno_assert (rc == 0);
-    }
-
-    delete pipe;
+    //  Peer can be set once only.
+    zmq_assert (!peer);
+    peer = peer_;
 }
 
-void zmq::reader_t::set_event_sink (i_reader_events *sink_)
+void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
 {
+    // Sink can be set once only.
     zmq_assert (!sink);
     sink = sink_;
 }
 
-bool zmq::reader_t::is_delimiter (msg_t &msg_)
-{
-    return msg_.is_delimiter ();
-}
-
-bool zmq::reader_t::check_read ()
+bool zmq::pipe_t::check_read ()
 {
-    if (!active)
+    if (unlikely (!in_active))
         return false;
 
     //  Check if there's an item in the pipe.
-    if (!pipe->check_read ()) {
-        active = false;
+    if (!inpipe->check_read ()) {
+        in_active = false;
         return false;
     }
 
     //  If the next item in the pipe is message delimiter,
-    //  initiate its termination.
-    if (pipe->probe (is_delimiter)) {
+    //  initiate termination process.
+    if (inpipe->probe (is_delimiter)) {
         msg_t msg;
-        bool ok = pipe->read (&msg);
+        bool ok = inpipe->read (&msg);
         zmq_assert (ok);
-        if (sink)
-            sink->delimited (this);
-        terminate ();
+        delimited = true;
+
+        //  If pipe_term was already received but wasn't processed because
+        //  of pending messages, we can ack it now.
+        if (terminating)
+            send_pipe_term_ack (peer);
+
         return false;
     }
 
     return true;
 }
 
-bool zmq::reader_t::read (msg_t *msg_)
+bool zmq::pipe_t::read (msg_t *msg_)
 {
-    if (!active)
+    if (unlikely (!in_active))
         return false;
 
-    if (!pipe->read (msg_)) {
-        active = false;
+    if (!inpipe->read (msg_)) {
+        in_active = false;
         return false;
     }
 
     //  If delimiter was read, start termination process of the pipe.
     if (msg_->is_delimiter ()) {
-        if (sink)
-            sink->delimited (this);
-        terminate ();
+        delimited = true;
+
+        //  If pipe_term was already received but wasn't processed because
+        //  of pending messages, we can ack it now.
+        if (terminating)
+            send_pipe_term_ack (peer);
+
         return false;
     }
 
@@ -120,175 +143,148 @@ bool zmq::reader_t::read (msg_t *msg_)
         msgs_read++;
 
     if (lwm > 0 && msgs_read % lwm == 0)
-        send_activate_writer (writer, msgs_read);
+        send_activate_write (peer, msgs_read);
 
     return true;
 }
 
-void zmq::reader_t::terminate ()
+bool zmq::pipe_t::check_write (msg_t *msg_)
 {
-    //  If termination was already started by the peer, do nothing.
-    if (terminating)
-        return;
+    if (unlikely (!out_active))
+        return false;
 
-    active = false;
-    terminating = true;
-    send_pipe_term (writer);
-}
+    bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);
 
-void zmq::reader_t::process_activate_reader ()
-{
-    //  Forward the event to the sink (either socket or session).
-    active = true;
-    sink->activated (this);
+    if (unlikely (full)) {
+        out_active = false;
+        return false;
+    }
+
+    return true;
 }
 
-void zmq::reader_t::process_pipe_term_ack ()
+bool zmq::pipe_t::write (msg_t *msg_)
 {
-    //  At this point writer may already be deallocated.
-    //  For safety's sake drop the reference to it.
-    writer = NULL;
+    if (unlikely (!check_write (msg_)))
+        return false;
 
-    //  Notify owner about the termination.
-    zmq_assert (sink);
-    sink->terminated (this);
+    outpipe->write (*msg_, msg_->flags () & msg_t::more);
+    if (!(msg_->flags () & msg_t::more))
+        msgs_written++;
 
-    //  Deallocate resources.
-    delete this;
+    return true;
 }
 
-zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_,
-      int hwm_) :
-    object_t (parent_),
-    active (true),
-    pipe (pipe_),
-    reader (reader_),
-    hwm (hwm_),
-    msgs_read (0),
-    msgs_written (0),
-    sink (NULL),
-    terminating (false)
+void zmq::pipe_t::rollback ()
 {
-    //  Inform reader about the writer.
-    reader->set_writer (this);
+    //  Remove incomplete message from the outbound pipe.
+    msg_t msg;
+    while (outpipe->unwrite (&msg)) {
+        zmq_assert (msg.flags () & msg_t::more);
+        int rc = msg.close ();
+        errno_assert (rc == 0);
+    }
 }
 
-zmq::writer_t::~writer_t ()
+void zmq::pipe_t::flush ()
 {
+    if (!outpipe->flush ())
+        send_activate_read (peer);
 }
 
-void zmq::writer_t::set_event_sink (i_writer_events *sink_)
+void zmq::pipe_t::process_activate_read ()
 {
-    zmq_assert (!sink);
-    sink = sink_;
+    if (!in_active && !terminating) {
+        in_active = true;
+        sink->read_activated (this);
+    }
 }
 
-bool zmq::writer_t::check_write (msg_t *msg_)
+void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
 {
-    //  We've already checked and there's no space free for the new message.
-    //  There's no point in checking once again.
-    if (unlikely (!active))
-        return false;
+    //  Remember the peers's message sequence number.
+    peers_msgs_read = msgs_read_;
 
-    if (unlikely (pipe_full ())) {
-        active = false;
-        return false;
+    if (!out_active && !terminating) {
+        out_active = true;
+        sink->write_activated (this);
     }
-
-    return true;
 }
 
-bool zmq::writer_t::write (msg_t *msg_)
+void zmq::pipe_t::process_pipe_term ()
 {
-    if (unlikely (!check_write (msg_)))
-        return false;
-
-    pipe->write (*msg_, msg_->flags () & msg_t::more);
-    if (!(msg_->flags () & msg_t::more))
-        msgs_written++;
-
-    return true;
+    term_recvd = true;
+
+    //  We can proceed with the termination if one of the following is true:
+    //  1. User asked this side of pipe to terminate already.
+    //  2. Waiting for pending messages in not required.
+    //  3. Delimiter was already received.
+    if (terminating || !delay || delimited) {
+        terminating = true;
+        send_pipe_term_ack (peer);
+    }
 }
 
-void zmq::writer_t::rollback ()
+void zmq::pipe_t::process_pipe_term_ack ()
 {
-    //  Remove incomplete message from the pipe.
+    //  Notify the user that all the references to the pipe should be dropped.
+    zmq_assert (sink);
+    sink->terminated (this);
+
+    //  If the peer haven't asked for the termination itself, we have to
+    //  ack the ack, so that it can deallocate properly.
+    if (!term_recvd)
+        send_pipe_term_ack (peer);
+
+    //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
+    //  pipe (which is an inbound pipe from its point of view).
+    //  First, delete all the unread messages in the pipe. We have to do it by
+    //  hand because msg_t doesn't have automatic destructor. Then deallocate
+    //  the ypipe itself.
     msg_t msg;
-    while (pipe->unwrite (&msg)) {
-        zmq_assert (msg.flags () & msg_t::more);
-        int rc = msg.close ();
-        errno_assert (rc == 0);
+    while (inpipe->read (&msg)) {
+       int rc = msg.close ();
+       errno_assert (rc == 0);
     }
-}
+    delete inpipe;
 
-void zmq::writer_t::flush ()
-{
-    if (!pipe->flush ())
-        send_activate_reader (reader);
+    //  Deallocate the pipe object
+    delete this;
 }
 
-void zmq::writer_t::terminate ()
+void zmq::pipe_t::terminate ()
 {
     //  Prevent double termination.
     if (terminating)
         return;
     terminating = true;
 
-    //  Mark the pipe as not available for writing.
-    active = false;
+    //  Stop inbound and outbound flow of messages.
+    in_active = false;
+    out_active = false;
 
-    //  Rollback any unfinished messages.
+    //  Rollback any unfinished outbound messages.
     rollback ();
 
-    //  Push delimiter into the pipe. Trick the compiler to belive that
-    //  the tag is a valid pointer. Note that watermarks are not checked
-    //  thus the delimiter can be written even though the pipe is full.
+    //  Push delimiter into the outbound pipe. Note that watermarks are not
+    //  checked thus the delimiter can be written even though the pipe is full.
     msg_t msg;
     msg.init_delimiter ();
-    pipe->write (msg, false);
+    outpipe->write (msg, false);
     flush ();
-}
-
-void zmq::writer_t::process_activate_writer (uint64_t msgs_read_)
-{
-    //  Store the reader's message sequence number.
-    msgs_read = msgs_read_;
-
-    //  If the writer was non-active before, let's make it active
-    //  (available for writing messages to).
-    if (!active && !terminating) {
-        active = true;
-        zmq_assert (sink);
-        sink->activated (this);
-    }
-}
-
-void zmq::writer_t::process_pipe_term ()
-{
-    send_pipe_term_ack (reader);
-
-    //  The above command allows reader to deallocate itself and the pipe.
-    //  For safety's sake we'll drop the pointers here.
-    reader = NULL;
-    pipe = NULL;
 
-    //  Notify owner about the termination.
-    zmq_assert (sink);
-    sink->terminated (this);
-
-    //  Deallocate the resources.
-    delete this;
+    //  Start the termination handshaking.
+    send_pipe_term (peer);
 }
 
-bool zmq::writer_t::pipe_full ()
+bool zmq::pipe_t::is_delimiter (msg_t &msg_)
 {
-    return hwm > 0 && msgs_written - msgs_read == uint64_t (hwm);
+    return msg_.is_delimiter ();
 }
 
-void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
-    int hwm_, reader_t **reader_, writer_t **writer_)
+int zmq::pipe_t::compute_lwm (int hwm_)
 {
-    //  First compute the low water mark. Following point should be taken
+    //  Compute the low water mark. Following point should be taken
     //  into consideration:
     //
     //  1. LWM has to be less than HWM.
@@ -308,17 +304,8 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
     //  That done, we still we have to account for the cases where
     //  HWM < max_wm_delta thus driving LWM to negative numbers.
     //  Let's make LWM 1/2 of HWM in such cases.
-    int lwm = (hwm_ > max_wm_delta * 2) ?
+    int result = (hwm_ > max_wm_delta * 2) ?
         hwm_ - max_wm_delta : (hwm_ + 1) / 2;
 
-    //  Create all three objects pipe consists of: the pipe per se, reader and
-    //  writer. The pipe will be handled by reader and writer, its never passed
-    //  to the user. Reader and writer are returned to the user.
-    pipe_t *pipe = new (std::nothrow) pipe_t ();
-    alloc_assert (pipe);
-    *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm);
-    alloc_assert (*reader_);
-    *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_,
-        hwm_);
-    alloc_assert (*writer_);
+    return result;
 }
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 75b5c47..fcba877 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -22,47 +22,43 @@
 #define __ZMQ_PIPE_HPP_INCLUDED__
 
 #include "msg.hpp"
-#include "array.hpp"
 #include "ypipe.hpp"
 #include "config.hpp"
 #include "object.hpp"
-#include "stdint.hpp"
+#include "array.hpp"
 
 namespace zmq
 {
 
-    //  Creates a pipe. Returns pointer to reader and writer objects.
-    void create_pipe (object_t *reader_parent_, object_t *writer_parent_,
-        int hwm_, class reader_t **reader_, class writer_t **writer_);
-
-    //  The shutdown mechanism for pipe works as follows: Either endpoint
-    //  (or even both of them) can ask pipe to terminate by calling 'terminate'
-    //  method. Pipe then terminates in asynchronous manner. When the part of
-    //  the shutdown tied to the endpoint is done it triggers 'terminated'
-    //  event. When endpoint processes the event and returns, associated
-    //  reader/writer object is deallocated.
-
-    typedef ypipe_t <msg_t, message_pipe_granularity> pipe_t;
+    //  Create a pipepair for bi-directional transfer of messages.
+    //  First HWM is for messages passed from first pipe to the second pipe.
+    //  Second HWM is for messages passed from second pipe to the first pipe.
+    //  Delay specifies whether the pipe receives all the pending messages
+    //  before terminating or whether it terminates straight away.
+    int pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
+        int hwms_ [2], bool delays_ [2]);
 
-    struct i_reader_events
+    struct i_pipe_events
     {
-        virtual ~i_reader_events () {}
+        virtual ~i_pipe_events () {}
 
-        virtual void terminated (class reader_t *pipe_) = 0;
-        virtual void activated (class reader_t *pipe_) = 0;
-        virtual void delimited (class reader_t *pipe_) = 0;
+        virtual void read_activated (class pipe_t *pipe_) = 0;
+        virtual void write_activated (class pipe_t *pipe_) = 0;
+        virtual void terminated (class pipe_t *pipe_) = 0;
     };
 
-    class reader_t : public object_t, public array_item_t
+    class pipe_t :
+        public object_t,
+        public array_item_t
     {
-        friend void create_pipe (object_t*, object_t*, int,
-            reader_t**, writer_t**);
-        friend class writer_t;
+        //  This allows pipepair to create pipe objects.
+        friend int pipepair (class object_t *parents_ [2],
+            class pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]);
 
     public:
 
-        //  Specifies the object to get events from the reader.
-        void set_event_sink (i_reader_events *endpoint_);
+        //  Specifies the object to send events to.
+        void set_event_sink (i_pipe_events *sink_);
 
         //  Returns true if there is at least one message to read in the pipe.
         bool check_read ();
@@ -70,127 +66,100 @@ namespace zmq
         //  Reads a message to the underlying pipe.
         bool read (msg_t *msg_);
 
-        //  Ask pipe to terminate.
-        void terminate ();
-
-    private:
-
-        reader_t (class object_t *parent_, pipe_t *pipe_, int lwm_);
-        ~reader_t ();
-
-        //  To be called only by writer itself!
-        void set_writer (class writer_t *writer_);
-
-        //  Command handlers.
-        void process_activate_reader ();
-        void process_pipe_term_ack ();
-
-        //  Returns true if the message is delimiter; false otherwise.
-        static bool is_delimiter (msg_t &msg_);
-
-        //  True, if pipe can be read from.
-        bool active;
-
-        //  The underlying pipe.
-        pipe_t *pipe;
-
-        //  Pipe writer associated with the other side of the pipe.
-        class writer_t *writer;
-
-        //  Low watermark for in-memory storage (in bytes).
-        int lwm;
-
-        //  Number of messages read so far.
-        uint64_t msgs_read;
-
-        //  Sink for the events (either the socket of the session).
-        i_reader_events *sink;
-
-        //  True is 'terminate' method was called or delimiter
-        //  was read from the pipe.
-        bool terminating;
-
-        reader_t (const reader_t&);
-        const reader_t &operator = (const reader_t&);
-    };
-
-    struct i_writer_events
-    {
-        virtual ~i_writer_events () {}
-
-        virtual void terminated (class writer_t *pipe_) = 0;
-        virtual void activated (class writer_t *pipe_) = 0;
-    };
-
-    class writer_t : public object_t, public array_item_t
-    {
-        friend void create_pipe (object_t*, object_t*, int,
-            reader_t**, writer_t**);
-
-    public:
-
-        //  Specifies the object to get events from the writer.
-        void set_event_sink (i_writer_events *endpoint_);
-
-        //  Checks whether messages can be written to the pipe.
-        //  If writing the message would cause high watermark
-        //  the function returns false.
+        //  Checks whether messages can be written to the pipe. If writing
+        //  the message would cause high watermark the function returns false.
         bool check_write (msg_t *msg_);
 
         //  Writes a message to the underlying pipe. Returns false if the
         //  message cannot be written because high watermark was reached.
         bool write (msg_t *msg_);
 
-        //  Remove unfinished part of a message from the pipe.
+        //  Remove unfinished parts of the outbound message from the pipe.
         void rollback ();
 
         //  Flush the messages downsteam.
         void flush ();
 
-        //  Ask pipe to terminate.
+        //  Ask pipe to terminate. The termination will happen asynchronously
+        //  and user will be notified about actual deallocation by 'terminated'
+        //  event.
         void terminate ();
 
     private:
 
-        writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_,
-            int hwm_);
-        ~writer_t ();
-
         //  Command handlers.
-        void process_activate_writer (uint64_t msgs_read_);
+        void process_activate_read ();
+        void process_activate_write (uint64_t msgs_read_);
         void process_pipe_term ();
+        void process_pipe_term_ack ();
+
+        //  Type of the underlying lock-free pipe.
+        typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
 
-        //  Tests whether underlying pipe is already full.
-        bool pipe_full ();
+        //  Constructor is private. Pipe can only be created using
+        //  pipepair function.
+        pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
+            int inhwm_, int outhwm_, bool delay_);
 
-        //  True, if this object can be written to.
-        bool active;
+        //  Pipepair uses this function to let us know about
+        //  the peer pipe object.
+        void set_peer (pipe_t *pipe_);
 
-        //  The underlying pipe.
-        pipe_t *pipe;
+        //  Destructor is private. Pipe objects destroy themselves.
+        ~pipe_t ();
 
-        //  Pipe reader associated with the other side of the pipe.
-        reader_t *reader;
+        //  Underlying pipes for both directions.
+        upipe_t *inpipe;
+        upipe_t *outpipe;
 
-        //  High watermark for in-memory storage (in bytes).
+        //  Can the pipe be read from / written to?
+        bool in_active;
+        bool out_active;
+
+        //  High watermark for the outbound pipe.
         int hwm;
 
-        //  Last confirmed number of messages read from the pipe.
-        //  The actual number can be higher.
-        uint64_t msgs_read;
+        //  Low watermark for the inbound pipe.
+        int lwm;
 
-        //  Number of messages we have written so far.
+        //  Number of messages read and written so far.
+        uint64_t msgs_read;
         uint64_t msgs_written;
 
-        //  Sink for the events (either the socket or the session).
-        i_writer_events *sink;
+        //  Last received peer's msgs_read. The actual number in the peer
+        //  can be higher at the moment.
+        uint64_t peers_msgs_read;
 
-        //  True is 'terminate' method was called of 'pipe_term' command
-        //  arrived from the reader.
+        //  The pipe object on the other side of the pipepair.
+        pipe_t *peer;
+
+        //  Sink to send events to.
+        i_pipe_events *sink;
+
+        //  True is 'terminate' method was called or termination request
+        //  was received from the peer.
         bool terminating;
 
-        writer_t (const writer_t&);
-        const writer_t &operator = (const writer_t&);
+        //  True is we've already got pipe_term command from the peer.
+        bool term_recvd;
+
+        //  True if delimiter was already received from the peer.
+        bool delimited;
+
+        //  If true, we receive all the pending inbound messages before
+        //  terminating. If false, we terminate immediately when the peer
+        //  asks us to.
+        bool delay;
+
+        //  Returns true if the message is delimiter; false otherwise.
+        static bool is_delimiter (msg_t &msg_);
+
+        //  Computes appropriate low watermark from the given high watermark.
+        static int compute_lwm (int hwm_);
+
+        //  Disable copying.
+        pipe_t (const pipe_t&);
+        const pipe_t &operator = (const pipe_t&);
     };
 
 }
diff --git a/src/pull.cpp b/src/pull.cpp
index b9d4433..66457b8 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -27,19 +27,33 @@ zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) :
     fq (this)
 {
     options.type = ZMQ_PULL;
-    options.requires_in = true;
-    options.requires_out = false;
 }
 
 zmq::pull_t::~pull_t ()
 {
 }
 
-void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
-    class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
 {
-    zmq_assert (inpipe_ && !outpipe_);
-    fq.attach (inpipe_);
+    zmq_assert (pipe_);
+    pipe_->set_event_sink (this);
+    fq.attach (pipe_);
+}
+
+void zmq::pull_t::read_activated (pipe_t *pipe_)
+{
+    fq.activated (pipe_);
+}
+
+void zmq::pull_t::write_activated (pipe_t *pipe_)
+{
+    //  There are no outbound messages in pull socket. This should never happen.
+    zmq_assert (false);
+}
+
+void zmq::pull_t::terminated (pipe_t *pipe_)
+{
+    fq.terminated (pipe_);
 }
 
 void zmq::pull_t::process_term (int linger_)
diff --git a/src/pull.hpp b/src/pull.hpp
index ffc3fdb..af59724 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -22,12 +22,15 @@
 #define __ZMQ_PULL_HPP_INCLUDED__
 
 #include "socket_base.hpp"
+#include "pipe.hpp"
 #include "fq.hpp"
 
 namespace zmq
 {
 
-    class pull_t : public socket_base_t
+    class pull_t :
+        public socket_base_t,
+        public i_pipe_events
     {
     public:
 
@@ -37,13 +40,17 @@ namespace zmq
     protected:
 
         //  Overloads of functions from socket_base_t.
-        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
-            const blob_t &peer_identity_);
+        void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
         int xrecv (class msg_t *msg_, int flags_);
         bool xhas_in ();
 
     private:
 
+        //  i_pipe_events interface implementation.
+        void read_activated (pipe_t *pipe_);
+        void write_activated (pipe_t *pipe_);
+        void terminated (pipe_t *pipe_);
+
         //  Hook into the termination process.
         void process_term (int linger_);
 
diff --git a/src/push.cpp b/src/push.cpp
index d6ee399..12fc8d2 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -28,19 +28,33 @@ zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) :
     lb (this)
 {
     options.type = ZMQ_PUSH;
-    options.requires_in = false;
-    options.requires_out = true;
 }
 
 zmq::push_t::~push_t ()
 {
 }
 
-void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
-    class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::push_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
 {
-    zmq_assert (!inpipe_ && outpipe_);
-    lb.attach (outpipe_);
+    zmq_assert (pipe_);
+    pipe_->set_event_sink (this);
+    lb.attach (pipe_);
+}
+
+void zmq::push_t::read_activated (pipe_t *pipe_)
+{
+    //  There are no inbound messages in push socket. This should never happen.
+    zmq_assert (false);
+}
+
+void zmq::push_t::write_activated (pipe_t *pipe_)
+{
+    lb.activated (pipe_);
+}
+
+void zmq::push_t::terminated (pipe_t *pipe_)
+{
+    lb.terminated (pipe_);
 }
 
 void zmq::push_t::process_term (int linger_)
diff --git a/src/push.hpp b/src/push.hpp
index c4d63f6..67763eb 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -22,12 +22,15 @@
 #define __ZMQ_PUSH_HPP_INCLUDED__
 
 #include "socket_base.hpp"
+#include "pipe.hpp"
 #include "lb.hpp"
 
 namespace zmq
 {
 
-    class push_t : public socket_base_t
+    class push_t :
+        public socket_base_t,
+        public i_pipe_events
     {
     public:
 
@@ -37,13 +40,17 @@ namespace zmq
     protected:
 
         //  Overloads of functions from socket_base_t.
-        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
-            const blob_t &peer_identity_);
+        void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
         int xsend (class msg_t *msg_, int flags_);
         bool xhas_out ();
 
     private:
 
+        //  i_pipe_events interface implementation.
+        void read_activated (pipe_t *pipe_);
+        void write_activated (pipe_t *pipe_);
+        void terminated (pipe_t *pipe_);
+
         //  Hook into the termination process.
         void process_term (int linger_);
 
diff --git a/src/session.cpp b/src/session.cpp
index 499fe40..5ef21c7 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -29,13 +29,12 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
       class socket_base_t *socket_, const options_t &options_) :
     own_t (io_thread_, options_),
     io_object_t (io_thread_),
-    in_pipe (NULL),
+    pipe (NULL),
     incomplete_in (false),
-    out_pipe (NULL),
     engine (NULL),
     socket (socket_),
     io_thread (io_thread_),
-    pipes_attached (false),
+    pipe_attached (false),
     delimiter_processed (false),
     force_terminate (false),
     has_linger_timer (false),
@@ -45,8 +44,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
 
 zmq::session_t::~session_t ()
 {
-    zmq_assert (!in_pipe);
-    zmq_assert (!out_pipe);
+    zmq_assert (!pipe);
 
     if (engine)
         engine->terminate ();
@@ -66,13 +64,9 @@ void zmq::session_t::proceed_with_term ()
         has_linger_timer = false;
     }
 
-    if (in_pipe) {
+    if (pipe) {
         register_term_acks (1);
-        in_pipe->terminate ();
-    }
-    if (out_pipe) {
-        register_term_acks (1);
-        out_pipe->terminate ();
+        pipe->terminate ();
     }
 
     //  The session has already waited for the linger period. We don't want
@@ -82,10 +76,10 @@ void zmq::session_t::proceed_with_term ()
 
 bool zmq::session_t::read (msg_t *msg_)
 {
-    if (!in_pipe)
+    if (!pipe)
         return false;
 
-    if (!in_pipe->read (msg_))
+    if (!pipe->read (msg_))
         return false;
 
     incomplete_in = msg_->flags () & msg_t::more;
@@ -94,7 +88,7 @@ bool zmq::session_t::read (msg_t *msg_)
 
 bool zmq::session_t::write (msg_t *msg_)
 {
-    if (out_pipe && out_pipe->write (msg_)) {
+    if (pipe && pipe->write (msg_)) {
         int rc = msg_->init ();
         errno_assert (rc == 0);
         return true;
@@ -105,21 +99,20 @@ bool zmq::session_t::write (msg_t *msg_)
 
 void zmq::session_t::flush ()
 {
-    if (out_pipe)
-        out_pipe->flush ();
+    if (pipe)
+        pipe->flush ();
 }
 
 void zmq::session_t::clean_pipes ()
 {
-    //  Get rid of half-processed messages in the out pipe. Flush any
-    //  unflushed messages upstream.
-    if (out_pipe) {
-        out_pipe->rollback ();
-        out_pipe->flush ();
-    }
+    if (pipe) {
 
-    //  Remove any half-read message from the in pipe.
-    if (in_pipe) {
+        //  Get rid of half-processed messages in the out pipe. Flush any
+        //  unflushed messages upstream.
+        pipe->rollback ();
+        pipe->flush ();
+
+        //  Remove any half-read message from the in pipe.
         while (incomplete_in) {
             msg_t msg;
             int rc = msg.init ();
@@ -134,78 +127,54 @@ void zmq::session_t::clean_pipes ()
     }
 }
 
-void zmq::session_t::attach_pipes (class reader_t *inpipe_,
-    class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::session_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
 {
-    zmq_assert (!pipes_attached);
-    pipes_attached = true;
+    zmq_assert (!pipe_attached);
+    pipe_attached = true;
     
-    if (inpipe_) {
-        zmq_assert (!in_pipe);
-        in_pipe = inpipe_;
-        in_pipe->set_event_sink (this);
-    }
-
-    if (outpipe_) {
-        zmq_assert (!out_pipe);
-        out_pipe = outpipe_;
-        out_pipe->set_event_sink (this);
+    if (pipe_) {
+        zmq_assert (!pipe);
+        pipe = pipe_;
+        pipe->set_event_sink (this);
     }
 
     //  If we are already terminating, terminate the pipes straight away.
     if (state == terminating) {
-        if (in_pipe) {
-            in_pipe->terminate ();
-            register_term_acks (1);
-        }
-        if (out_pipe) {
-            out_pipe->terminate ();
+        if (pipe) {
+            pipe->terminate ();
             register_term_acks (1);
         }
     }
 }
 
-void zmq::session_t::delimited (reader_t *pipe_)
+void zmq::session_t::terminated (pipe_t *pipe_)
 {
-    zmq_assert (in_pipe == pipe_);
-    zmq_assert (!delimiter_processed);
-    delimiter_processed = true;
+    zmq_assert (pipe == pipe_);
 
-    //  If we are in process of being closed, but still waiting for all
-    //  pending messeges being sent, we can terminate here.
+    // If we are in process of being closed, but still waiting for all
+    // pending messeges being sent, we can terminate here.
     if (state == pending)
         proceed_with_term ();
-}
 
-void zmq::session_t::terminated (reader_t *pipe_)
-{
-    zmq_assert (in_pipe == pipe_);
-    in_pipe = NULL;
+    pipe = NULL;
     if (state == terminating)
         unregister_term_ack ();
 }
 
-void zmq::session_t::terminated (writer_t *pipe_)
+void zmq::session_t::read_activated (pipe_t *pipe_)
 {
-    zmq_assert (out_pipe == pipe_);
-    out_pipe = NULL;
-    if (state == terminating)
-        unregister_term_ack ();
-}
-
-void zmq::session_t::activated (reader_t *pipe_)
-{
-    zmq_assert (in_pipe == pipe_);
+    zmq_assert (pipe == pipe_);
 
     if (likely (engine != NULL))
         engine->activate_out ();
     else
-        in_pipe->check_read ();
+        pipe->check_read ();
 }
 
-void zmq::session_t::activated (writer_t *pipe_)
+void zmq::session_t::write_activated (pipe_t *pipe_)
 {
-    zmq_assert (out_pipe == pipe_);
+    zmq_assert (pipe == pipe_);
+
     if (engine)
         engine->activate_in ();
 }
@@ -240,29 +209,27 @@ void zmq::session_t::process_attach (i_engine *engine_,
         return;
     }
 
-    //  Check whether the required pipes already exist. If not so, we'll
-    //  create them and bind them to the socket object.
-    if (!pipes_attached) {
-        zmq_assert (!in_pipe && !out_pipe);
-        pipes_attached = true;
-        reader_t *socket_reader = NULL;
-        writer_t *socket_writer = NULL;
-
-        //  Create the pipes, as required.
-        if (options.requires_in) {
-            create_pipe (socket, this, options.rcvhwm, &socket_reader,
-                &out_pipe);
-            out_pipe->set_event_sink (this);
-        }
-        if (options.requires_out) {
-            create_pipe (this, socket, options.sndhwm, &in_pipe,
-                &socket_writer);
-            in_pipe->set_event_sink (this);
-        }
+    //  Check whether the required pipe already exists and create it
+    //  if it does not.
+    if (!pipe_attached) {
+        zmq_assert (!pipe);
+        pipe_attached = true;
+
+        object_t *parents [2] = {this, socket};
+        pipe_t *pipes [2] = {NULL, NULL};
+        int hwms [2] = {options.rcvhwm, options.sndhwm};
+        bool delays [2] = {true, true};
+        int rc = pipepair (parents, pipes, hwms, delays);
+        errno_assert (rc == 0);
+
+        //  Plug the local end of the pipe.
+        pipes [0]->set_event_sink (this);
+
+        //  Remember the local end of the pipe.
+        pipe = pipes [0];
 
-        //  Bind the pipes to the socket object.
-        if (socket_reader || socket_writer)
-            send_bind (socket, socket_reader, socket_writer, peer_identity_);
+        //  Ask socket to plug into the remote end of the pipe.
+        send_bind (socket, pipes [1], peer_identity_);
     }
 
     //  Plug in the engine.
@@ -282,9 +249,9 @@ void zmq::session_t::detach ()
     //  Send the event to the derived class.
     detached ();
 
-    //  Just in case, there's only a delimiter in the inbound pipe.
-    if (in_pipe)
-        in_pipe->check_read ();
+    //  Just in case there's only a delimiter in the inbound pipe.
+    if (pipe)
+        pipe->check_read ();
 }
 
 void zmq::session_t::process_term (int linger_)
@@ -308,16 +275,16 @@ void zmq::session_t::process_term (int linger_)
 
     //  If there's no engine and there's only delimiter in the pipe it wouldn't
     //  be ever read. Thus we check for it explicitly.
-    if (in_pipe)
-        in_pipe->check_read ();
+    if (pipe)
+        pipe->check_read ();
 
-    //  If there's no in pipe there are no pending messages to send.
+    //  If there's no in pipe, there are no pending messages to send.
     //  We can proceed with the shutdown straight away. Also, if there is
-    //  inbound pipe, but the delimiter was already processed, we can
-    //  terminate immediately. Alternatively, if the derived session type have
+    //  pipe, but the delimiter was already processed, we can terminate
+    //  immediately. Alternatively, if the derived session type have
     //  called 'terminate' we'll finish straight away.
-    if (!options.requires_out || delimiter_processed || force_terminate ||
-        (!options.immediate_connect && !in_pipe))
+    if (delimiter_processed || force_terminate ||
+          (!options.immediate_connect && !pipe))
         proceed_with_term ();
 }
 
diff --git a/src/session.hpp b/src/session.hpp
index d2f8882..4a12d68 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -34,8 +34,7 @@ namespace zmq
         public own_t,
         public io_object_t,
         public i_inout,
-        public i_reader_events,
-        public i_writer_events
+        public i_pipe_events
     {
     public:
 
@@ -50,17 +49,12 @@ namespace zmq
         void flush ();
         void detach ();
 
-        void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
-            const blob_t &peer_identity_);
-
-        //  i_reader_events interface implementation.
-        void activated (class reader_t *pipe_);
-        void terminated (class reader_t *pipe_);
-        void delimited (class reader_t *pipe_);
+        void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
 
-        //  i_writer_events interface implementation.
-        void activated (class writer_t *pipe_);
-        void terminated (class writer_t *pipe_);
+        //  i_pipe_events interface implementation.
+        void read_activated (class pipe_t *pipe_);
+        void write_activated (class pipe_t *pipe_);
+        void terminated (class pipe_t *pipe_);
 
     protected:
 
@@ -103,16 +97,13 @@ namespace zmq
         //  Call this function to move on with the delayed process_term.
         void proceed_with_term ();
 
-        //  Inbound pipe, i.e. one the session is getting messages from.
-        class reader_t *in_pipe;
+        //  Pipe connecting the session to its socket.
+        class pipe_t *pipe;
 
         //  This flag is true if the remainder of the message being processed
         //  is still in the in pipe.
         bool incomplete_in;
 
-        //  Outbound pipe, i.e. one the socket is sending messages to.
-        class writer_t *out_pipe;
-
         //  The protocol I/O engine connected to the session.
         struct i_engine *engine;
 
@@ -123,8 +114,8 @@ namespace zmq
         //  the engines into the same thread.
         class io_thread_t *io_thread;
 
-        //  If true, pipes were already attached to this session.
-        bool pipes_attached;
+        //  If true, pipe was already attached to this session.
+        bool pipe_attached;
 
         //  If true, delimiter was already read from the inbound pipe.
         bool delimiter_processed;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 3e104a8..baa4bd2 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -211,17 +211,17 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
     return 0;
 }
 
-void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
-    class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_,
+    const blob_t &peer_identity_)
 {
     // If the peer haven't specified it's identity, let's generate one.
     if (peer_identity_.size ()) {
-        xattach_pipes (inpipe_, outpipe_, peer_identity_);
+        xattach_pipe (pipe_, peer_identity_);
     }
     else {
         blob_t identity (17, 0);
         generate_uuid ((unsigned char*) identity.data () + 1);
-        xattach_pipes (inpipe_, outpipe_, identity);
+        xattach_pipe (pipe_, identity);
     }
 }
 
@@ -378,11 +378,6 @@ int zmq::socket_base_t::connect (const char *addr_)
         if (!peer.socket)
             return -1;
 
-        reader_t *inpipe_reader = NULL;
-        writer_t *inpipe_writer = NULL;
-        reader_t *outpipe_reader = NULL;
-        writer_t *outpipe_writer = NULL;
-
         // The total HWM for an inproc connection should be the sum of
         // the binder's HWM and the connector's HWM.
         int  sndhwm;
@@ -396,24 +391,21 @@ int zmq::socket_base_t::connect (const char *addr_)
         else
             rcvhwm = options.rcvhwm + peer.options.sndhwm;
 
-        //  Create inbound pipe, if required.
-        if (options.requires_in)
-            create_pipe (this, peer.socket, rcvhwm, &inpipe_reader,
-                &inpipe_writer);
-
-        //  Create outbound pipe, if required.
-        if (options.requires_out)
-            create_pipe (peer.socket, this, sndhwm, &outpipe_reader,
-                &outpipe_writer);
+        //  Create a bi-directional pipe to connect the peers.
+        object_t *parents [2] = {this, peer.socket};
+        pipe_t *pipes [2] = {NULL, NULL};
+        int hwms [2] = {sndhwm, rcvhwm};
+        bool delays [2] = {true, true};
+        int rc = pipepair (parents, pipes, hwms, delays);
+        errno_assert (rc == 0);
 
-        //  Attach the pipes to this socket object.
-        attach_pipes (inpipe_reader, outpipe_writer, peer.options.identity);
+        //  Attach local end of the pipe to this socket object.
+        attach_pipe (pipes [0], peer.options.identity);
 
-        //  Attach the pipes to the peer socket. Note that peer's seqnum
-        //  was incremented in find_endpoint function. We don't need it
+        //  Attach remote end of the pipe to the peer socket. Note that peer's
+        //  seqnum was incremented in find_endpoint function. We don't need it
         //  increased here.
-        send_bind (peer.socket, outpipe_reader, inpipe_writer,
-            options.identity, false);
+        send_bind (peer.socket, pipes [1], options.identity, false);
 
         return 0;
     }
@@ -435,26 +427,19 @@ int zmq::socket_base_t::connect (const char *addr_)
     //  session once the connection is established.
     if (options.immediate_connect) {
 
-        reader_t *inpipe_reader = NULL;
-        writer_t *inpipe_writer = NULL;
-        reader_t *outpipe_reader = NULL;
-        writer_t *outpipe_writer = NULL;
-
-        //  Create inbound pipe, if required.
-        if (options.requires_in)
-            create_pipe (this, session, options.rcvhwm,
-                &inpipe_reader, &inpipe_writer);
-
-        //  Create outbound pipe, if required.
-        if (options.requires_out)
-            create_pipe (session, this, options.sndhwm,
-                &outpipe_reader, &outpipe_writer);
+        //  Create a bi-directional pipe.
+        object_t *parents [2] = {this, session};
+        pipe_t *pipes [2] = {NULL, NULL};
+        int hwms [2] = {options.sndhwm, options.rcvhwm};
+        bool delays [2] = {true, true};
+        int rc = pipepair (parents, pipes, hwms, delays);
+        errno_assert (rc == 0);
 
-        //  Attach the pipes to the socket object.
-        attach_pipes (inpipe_reader, outpipe_writer, blob_t ());
+        //  Attach local end of the pipe to the socket object.
+        attach_pipe (pipes [0], blob_t ());
 
-        //  Attach the pipes to the session object.
-        session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ());
+        //  Attach remote end of the pipe to the session object.
+        session->attach_pipe (pipes [1], blob_t ());
     }
 
     //  Activate the session. Make it a child of this socket.
@@ -718,10 +703,10 @@ void zmq::socket_base_t::process_stop ()
     ctx_terminated = true;
 }
 
-void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+void zmq::socket_base_t::process_bind (pipe_t *pipe_,
     const blob_t &peer_identity_)
 {
-    attach_pipes (in_pipe_, out_pipe_, peer_identity_);
+    attach_pipe (pipe_, peer_identity_);
 }
 
 void zmq::socket_base_t::process_unplug ()
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 0a5c574..531751b 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -110,8 +110,8 @@ namespace zmq
 
         //  Concrete algorithms for the x- methods are to be defined by
         //  individual socket types.
-        virtual void xattach_pipes (class reader_t *inpipe_,
-            class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
+        virtual void xattach_pipe (class pipe_t *pipe_,
+            const blob_t &peer_identity_) = 0;
 
         //  The default implementation assumes there are no specific socket
         //  options for the particular socket type. If not so, overload this
@@ -156,9 +156,8 @@ namespace zmq
         //  bind, is available and compatible with the socket type.
         int check_protocol (const std::string &protocol_);
 
-        //  If no identity set generate one and call xattach_pipes ().
-        void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
-            const blob_t &peer_identity_);
+        //  If no identity is set, generate one and call xattach_pipe ().
+        void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
 
         //  Processes commands sent to this socket (if any). If 'block' is
         //  set to true, returns only after at least one command was processed.
@@ -168,8 +167,7 @@ namespace zmq
 
         //  Handlers for incoming commands.
         void process_stop ();
-        void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
-            const blob_t &peer_identity_);
+        void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_);
         void process_unplug ();
 
         //  Socket's mailbox object.
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 2b5c4eb..888b42d 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -28,19 +28,33 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
     dist (this)
 {
     options.type = ZMQ_XPUB;
-    options.requires_in = false;
-    options.requires_out = true;
 }
 
 zmq::xpub_t::~xpub_t ()
 {
 }
 
-void zmq::xpub_t::xattach_pipes (class reader_t *inpipe_,
-    class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
 {
-    zmq_assert (!inpipe_ && outpipe_);
-    dist.attach (outpipe_);
+    zmq_assert (pipe_);
+    pipe_->set_event_sink (this);
+    dist.attach (pipe_);
+}
+
+void zmq::xpub_t::read_activated (pipe_t *pipe_)
+{
+    //  PUB socket never receives messages. This should never happen.
+    zmq_assert (false);
+}
+
+void zmq::xpub_t::write_activated (pipe_t *pipe_)
+{
+    dist.activated (pipe_);
+}
+
+void zmq::xpub_t::terminated (pipe_t *pipe_)
+{
+    dist.terminated (pipe_);
 }
 
 void zmq::xpub_t::process_term (int linger_)
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 19aa38a..48efd17 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -29,7 +29,9 @@
 namespace zmq
 {
 
-    class xpub_t : public socket_base_t
+    class xpub_t :
+        public socket_base_t,
+        public i_pipe_events
     {
     public:
 
@@ -37,8 +39,7 @@ namespace zmq
         ~xpub_t ();
 
         //  Implementations of virtual functions from socket_base_t.
-        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
-            const blob_t &peer_identity_);
+        void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
         int xsend (class msg_t *msg_, int flags_);
         bool xhas_out ();
         int xrecv (class msg_t *msg_, int flags_);
@@ -46,6 +47,11 @@ namespace zmq
 
     private:
 
+        //  i_pipe_events interface implementation.
+        void read_activated (pipe_t *pipe_);
+        void write_activated (pipe_t *pipe_);
+        void terminated (pipe_t *pipe_);
+
         //  Hook into the termination process.
         void process_term (int linger_);
 
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 2650f4e..d82890d 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -32,8 +32,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
     terminating (false)
 {
     options.type = ZMQ_XREP;
-    options.requires_in = true;
-    options.requires_out = true;
 
     //  On connect, pipes are created only after initial handshaking.
     //  That way we are aware of the peer's identity when binding to the pipes.
@@ -46,36 +44,26 @@ zmq::xrep_t::~xrep_t ()
     zmq_assert (outpipes.empty ());
 }
 
-void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
-    const blob_t &peer_identity_)
+void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
 {
-    if (outpipe_) {
-
-        outpipe_->set_event_sink (this);
-
-        //  TODO: What if new connection has same peer identity as the old one?
-        outpipe_t outpipe = {outpipe_, true};
-        bool ok = outpipes.insert (outpipes_t::value_type (
-            peer_identity_, outpipe)).second;
-        zmq_assert (ok);
-
-        if (terminating) {
-            register_term_acks (1);
-            outpipe_->terminate ();
-        }
-    }
-
-    if (inpipe_) {
-
-        inpipe_->set_event_sink (this);
-
-        inpipe_t inpipe = {inpipe_, peer_identity_, true};
-        inpipes.push_back (inpipe);
-
-        if (terminating) {
-            register_term_acks (1);
-            inpipe_->terminate ();
-        }
+    zmq_assert (pipe_);
+    pipe_->set_event_sink (this);
+
+    //  Add the pipe to the map out outbound pipes.
+    //  TODO: What if new connection has same peer identity as the old one?
+    outpipe_t outpipe = {pipe_, true};
+    bool ok = outpipes.insert (outpipes_t::value_type (
+        peer_identity_, outpipe)).second;
+    zmq_assert (ok);
+
+    //  Add the pipe to the list of inbound pipes.
+    inpipe_t inpipe = {pipe_, peer_identity_, true};
+    inpipes.push_back (inpipe);
+
+    //  In case we are already terminating, ask this pipe to terminate as well.
+    if (terminating) {
+        register_term_acks (1);
+        pipe_->terminate ();
     }
 }
 
@@ -85,21 +73,17 @@ void zmq::xrep_t::process_term (int linger_)
 
     register_term_acks ((int) (inpipes.size () + outpipes.size ()));
 
-    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
-          ++it)
-        it->reader->terminate ();
-    for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
-          ++it)
-        it->second.writer->terminate ();
+    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it)
+        it->pipe->terminate ();
 
     socket_base_t::process_term (linger_);
 }
 
-void zmq::xrep_t::terminated (reader_t *pipe_)
+void zmq::xrep_t::terminated (pipe_t *pipe_)
 {
     for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
           ++it) {
-        if (it->reader == pipe_) {
+        if (it->pipe == pipe_) {
             if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in)
                 current_in--;
             inpipes.erase (it);
@@ -107,17 +91,15 @@ void zmq::xrep_t::terminated (reader_t *pipe_)
                 current_in = 0;
             if (terminating)
                 unregister_term_ack ();
-            return;
+            goto clean_outpipes;
         }
     }
     zmq_assert (false);
-}
 
-void zmq::xrep_t::terminated (writer_t *pipe_)
-{
+clean_outpipes:
     for (outpipes_t::iterator it = outpipes.begin ();
           it != outpipes.end (); ++it) {
-        if (it->second.writer == pipe_) {
+        if (it->second.pipe == pipe_) {
             outpipes.erase (it);
             if (pipe_ == current_out)
                 current_out = NULL;
@@ -129,15 +111,11 @@ void zmq::xrep_t::terminated (writer_t *pipe_)
     zmq_assert (false);
 }
 
-void zmq::xrep_t::delimited (reader_t *pipe_)
-{
-}
-
-void zmq::xrep_t::activated (reader_t *pipe_)
+void zmq::xrep_t::read_activated (pipe_t *pipe_)
 {
     for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
           ++it) {
-        if (it->reader == pipe_) {
+        if (it->pipe == pipe_) {
             zmq_assert (!it->active);
             it->active = true;
             return;
@@ -146,11 +124,11 @@ void zmq::xrep_t::activated (reader_t *pipe_)
     zmq_assert (false);
 }
 
-void zmq::xrep_t::activated (writer_t *pipe_)
+void zmq::xrep_t::write_activated (pipe_t *pipe_)
 {
     for (outpipes_t::iterator it = outpipes.begin ();
           it != outpipes.end (); ++it) {
-        if (it->second.writer == pipe_) {
+        if (it->second.pipe == pipe_) {
             zmq_assert (!it->second.active);
             it->second.active = true;
             return;
@@ -178,7 +156,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
             outpipes_t::iterator it = outpipes.find (identity);
 
             if (it != outpipes.end ()) {
-                current_out = it->second.writer;
+                current_out = it->second.pipe;
                 msg_t empty;
                 int rc = empty.init ();
                 errno_assert (rc == 0);
@@ -245,7 +223,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
     //  If we are in the middle of reading a message, just grab next part of it.
     if (more_in) {
         zmq_assert (inpipes [current_in].active);
-        bool fetched = inpipes [current_in].reader->read (msg_);
+        bool fetched = inpipes [current_in].pipe->read (msg_);
         zmq_assert (fetched);
         more_in = msg_->flags () & msg_t::more;
         if (!more_in) {
@@ -261,7 +239,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
 
         //  Try to fetch new message.
         if (inpipes [current_in].active)
-            prefetched = inpipes [current_in].reader->read (&prefetched_msg);
+            prefetched = inpipes [current_in].pipe->read (&prefetched_msg);
 
         //  If we have a message, create a prefix and return it to the caller.
         if (prefetched) {
@@ -311,7 +289,7 @@ bool zmq::xrep_t::xhas_in ()
     //  pipe holding messages, skipping only pipes with no messages available.
     for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) {
         if (inpipes [current_in].active &&
-              inpipes [current_in].reader->check_read ())
+              inpipes [current_in].pipe->check_read ())
             return true;
 
         //  If me don't have a message, mark the pipe as passive and
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 7ca138c..d0378c2 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -35,8 +35,7 @@ namespace zmq
     //  TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
     class xrep_t :
         public socket_base_t,
-        public i_reader_events,
-        public i_writer_events
+        public i_pipe_events
     {
     public:
 
@@ -44,8 +43,7 @@ namespace zmq
         ~xrep_t ();
 
         //  Overloads of functions from socket_base_t.
-        void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
-            const blob_t &peer_identity_);
+        void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
         int xsend (class msg_t *msg_, int flags_);
         int xrecv (class msg_t *msg_, int flags_);
         bool xhas_in ();
@@ -61,18 +59,14 @@ namespace zmq
         //  Hook into the termination process.
         void process_term (int linger_);
 
-        //  i_reader_events interface implementation.
-        void activated (reader_t *pipe_);
-        void terminated (reader_t *pipe_);
-        void delimited (reader_t *pipe_);
-
-        //  i_writer_events interface implementation.
-        void activated (writer_t *pipe_);
-        void terminated (writer_t *pipe_);
+        //  i_pipe_events interface implementation.
+        void read_activated (pipe_t *pipe_);
+        void write_activated (pipe_t *pipe_);
+        void terminated (pipe_t *pipe_);
 
         struct inpipe_t
         {
-            class reader_t *reader;
+            class pipe_t *pipe;
             blob_t identity;
             bool active;
         };
@@ -95,7 +89,7 @@ namespace zmq
 
         struct outpipe_t
         {
-            class writer_t *writer;
+            class pipe_t *pipe;
             bool active;
         };
 
@@ -104,7 +98,7 @@ namespace zmq
         outpipes_t outpipes;
 
         //  The pipe we are currently writing to.
-        class writer_t *current_out;
+        class pipe_t *current_out;
 
         //  If true, more outgoing message parts are expected.
         bool more_out;
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 2fda2c1..4a6e67e 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -28,20 +28,18 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
     lb (this)
 {
     options.type = ZMQ_XREQ;
-    options.requires_in = true;
-    options.requires_out = true;
 }
 
 zmq::xreq_t::~xreq_t ()
 {
 }
 
-void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
-    class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::xreq_t::xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_)
 {
-    zmq_assert (inpipe_ && outpipe_);
-    fq.attach (inpipe_);
-    lb.attach (outpipe_);
+    zmq_assert (pipe_);
+    pipe_->set_event_sink (this);
+    fq.attach (pipe_);
+    lb.attach (pipe_);
 }
 
 void zmq::xreq_t::process_term (int linger_)
@@ -71,3 +69,19 @@ bool zmq::xreq_t::xhas_out ()
     return lb.has_out ();
 }
 
+void zmq::xreq_t::read_activated (pipe_t *pipe_)
+{
+    fq.activated (pipe_);
+}
+
+void zmq::xreq_t::write_activated (pipe_t *pipe_)
+{
+    lb.activated (pipe_);
+}
+
+void zmq::xreq_t::terminated (pipe_t *pipe_)
+{
+    fq.terminated (pipe_);
+    lb.terminated (pipe_);
+}
+
diff --git a/src/xreq.hpp b/src/xreq.hpp
index e0cafe5..a75e5c8 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -23,13 +23,16 @@
 #define __ZMQ_XREQ_HPP_INCLUDED__
 
 #include "socket_base.hpp"
+#include "pipe.hpp"
 #include "fq.hpp"
 #include "lb.hpp"
 
 namespace zmq
 {
 
-    class xreq_t : public socket_base_t
+    class xreq_t :
+        public socket_base_t,
+        public i_pipe_events
     {
     public:
 
@@ -39,8 +42,7 @@ namespace zmq
     protected:
 
         //  Overloads of functions from socket_base_t.
-        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
-            const blob_t &peer_identity_);
+        void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
         int xsend (class msg_t *msg_, int flags_);
         int xrecv (class msg_t *msg_, int flags_);
         bool xhas_in ();
@@ -48,6 +50,11 @@ namespace zmq
 
     private:
 
+        //  i_pipe_events interface implementation.
+        void read_activated (pipe_t *pipe_);
+        void write_activated (pipe_t *pipe_);
+        void terminated (pipe_t *pipe_);
+
         //  Hook into the termination process.
         void process_term (int linger_);
 
diff --git a/src/xsub.cpp b/src/xsub.cpp
index b0e8cd2..dc30d71 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -30,8 +30,6 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
     more (false)
 {
     options.type = ZMQ_XSUB;
-    options.requires_in = true;
-    options.requires_out = false;
     int rc = message.init ();
     errno_assert (rc == 0);
 }
@@ -42,11 +40,27 @@ zmq::xsub_t::~xsub_t ()
     errno_assert (rc == 0);
 }
 
-void zmq::xsub_t::xattach_pipes (class reader_t *inpipe_,
-    class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
 {
-    zmq_assert (inpipe_ && !outpipe_);
-    fq.attach (inpipe_);
+    zmq_assert (pipe_);
+    pipe_->set_event_sink (this);
+    fq.attach (pipe_);
+}
+
+void zmq::xsub_t::read_activated (pipe_t *pipe_)
+{
+    fq.activated (pipe_);
+}
+
+void zmq::xsub_t::write_activated (pipe_t *pipe_)
+{
+    //  SUB socket never sends messages. This should never happen.
+    zmq_assert (false);
+}
+
+void zmq::xsub_t::terminated (pipe_t *pipe_)
+{
+    fq.terminated (pipe_);
 }
 
 void zmq::xsub_t::process_term (int linger_)
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 202a29f..ed9c462 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -23,13 +23,16 @@
 
 #include "trie.hpp"
 #include "socket_base.hpp"
+#include "pipe.hpp"
 #include "msg.hpp"
 #include "fq.hpp"
 
 namespace zmq
 {
 
-    class xsub_t : public socket_base_t
+    class xsub_t :
+        public socket_base_t,
+        public i_pipe_events
     {
     public:
 
@@ -39,8 +42,7 @@ namespace zmq
     protected:
 
         //  Overloads of functions from socket_base_t.
-        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
-            const blob_t &peer_identity_);
+        void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
         int xsend (class msg_t *msg_, int options_);
         bool xhas_out ();
         int xrecv (class msg_t *msg_, int flags_);
@@ -48,6 +50,11 @@ namespace zmq
 
     private:
 
+        //  i_pipe_events interface implementation.
+        void read_activated (pipe_t *pipe_);
+        void write_activated (pipe_t *pipe_);
+        void terminated (pipe_t *pipe_);
+
         //  Hook into the termination process.
         void process_term (int linger_);
 
-- 
cgit v1.2.3