diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/array.hpp | 79 | ||||
| -rw-r--r-- | src/command.hpp | 11 | ||||
| -rw-r--r-- | src/connect_session.cpp | 1 | ||||
| -rw-r--r-- | src/dist.cpp | 10 | ||||
| -rw-r--r-- | src/dist.hpp | 13 | ||||
| -rw-r--r-- | src/fq.cpp | 12 | ||||
| -rw-r--r-- | src/fq.hpp | 12 | ||||
| -rw-r--r-- | src/lb.cpp | 8 | ||||
| -rw-r--r-- | src/lb.hpp | 14 | ||||
| -rw-r--r-- | src/object.cpp | 40 | ||||
| -rw-r--r-- | src/object.hpp | 19 | ||||
| -rw-r--r-- | src/options.cpp | 2 | ||||
| -rw-r--r-- | src/options.hpp | 5 | ||||
| -rw-r--r-- | src/own.cpp | 3 | ||||
| -rw-r--r-- | src/pair.cpp | 98 | ||||
| -rw-r--r-- | src/pair.hpp | 24 | ||||
| -rw-r--r-- | src/pipe.cpp | 335 | ||||
| -rw-r--r-- | src/pipe.hpp | 201 | ||||
| -rw-r--r-- | src/pull.cpp | 26 | ||||
| -rw-r--r-- | src/pull.hpp | 13 | ||||
| -rw-r--r-- | src/push.cpp | 26 | ||||
| -rw-r--r-- | src/push.hpp | 13 | ||||
| -rw-r--r-- | src/session.cpp | 167 | ||||
| -rw-r--r-- | src/session.hpp | 29 | ||||
| -rw-r--r-- | src/socket_base.cpp | 73 | ||||
| -rw-r--r-- | src/socket_base.hpp | 12 | ||||
| -rw-r--r-- | src/xpub.cpp | 26 | ||||
| -rw-r--r-- | src/xpub.hpp | 12 | ||||
| -rw-r--r-- | src/xrep.cpp | 90 | ||||
| -rw-r--r-- | src/xrep.hpp | 24 | ||||
| -rw-r--r-- | src/xreq.cpp | 28 | ||||
| -rw-r--r-- | src/xreq.hpp | 13 | ||||
| -rw-r--r-- | src/xsub.cpp | 26 | ||||
| -rw-r--r-- | src/xsub.hpp | 13 | 
34 files changed, 709 insertions, 769 deletions
| diff --git a/src/array.hpp b/src/array.hpp index 1d18e48..e7b5266 100644 --- a/src/array.hpp +++ b/src/array.hpp @@ -28,14 +28,17 @@ namespace zmq  {      //  Base class for objects stored in the array. Note that each object can -    //  be stored in at most one array. +    //  be stored in at most two arrays. This is needed specifically in the +    //  case where single pipe object is stored both in array of inbound pipes +    //  and in the array of outbound pipes.      class array_item_t      {      public:          inline array_item_t () : -            array_index (-1) +            array_index1 (-1), +            array_index2 (-1)          {          } @@ -45,19 +48,30 @@ namespace zmq          {          } -        inline void set_array_index (int index_) +        inline void set_array_index1 (int index_)          { -            array_index = index_; +            array_index1 = index_;          } -        inline int get_array_index () +        inline int get_array_index1 ()          { -            return array_index; +            return array_index1; +        } + +        inline void set_array_index2 (int index_) +        { +            array_index2 = index_; +        } + +        inline int get_array_index2 () +        { +            return array_index2;          }      private: -        int array_index; +        int array_index1; +        int array_index2;          array_item_t (const array_item_t&);          const array_item_t &operator = (const array_item_t&); @@ -65,9 +79,11 @@ namespace zmq      //  Fast array implementation with O(1) access to item, insertion and      //  removal. Array stores pointers rather than objects. The objects have -    //  to be derived from array_item_t class. +    //  to be derived from array_item_t class, thus they can be stored in +    //  two arrays. Template parameter N specifies which index in array_item_t +    //  to use. -    template <typename T> class array_t +    template <typename T, int N = 1> class array_t      {      public: @@ -98,28 +114,48 @@ namespace zmq          inline void push_back (T *item_)          { -            if (item_) -                item_->set_array_index ((int) items.size ()); +            if (item_) { +                if (N == 1) +                    item_->set_array_index1 ((int) items.size ()); +                else +                    item_->set_array_index2 ((int) items.size ()); +            }              items.push_back (item_);          } -        inline void erase (T *item_) { -            erase (item_->get_array_index ()); +        inline void erase (T *item_) +        { +            if (N == 1) +                erase (item_->get_array_index1 ()); +            else +                erase (item_->get_array_index2 ());          }          inline void erase (size_type index_) { -            if (items.back ()) -                items.back ()->set_array_index ((int) index_); +            if (items.back ()) { +                if (N == 1) +                    items.back ()->set_array_index1 ((int) index_); +                else +                    items.back ()->set_array_index2 ((int) index_); +            }              items [index_] = items.back ();              items.pop_back ();          }          inline void swap (size_type index1_, size_type index2_)          { -            if (items [index1_]) -                items [index1_]->set_array_index ((int) index2_); -            if (items [index2_]) -                items [index2_]->set_array_index ((int) index1_); +            if (N == 1) { +		        if (items [index1_]) +		            items [index1_]->set_array_index1 ((int) index2_); +		        if (items [index2_]) +		            items [index2_]->set_array_index1 ((int) index1_); +            } +            else { +		        if (items [index1_]) +		            items [index1_]->set_array_index2 ((int) index2_); +		        if (items [index2_]) +		            items [index2_]->set_array_index2 ((int) index1_); +            }              std::swap (items [index1_], items [index2_]);          } @@ -130,7 +166,10 @@ namespace zmq          inline size_type index (T *item_)          { -            return (size_type) item_->get_array_index (); +            if (N == 1) +                return (size_type) item_->get_array_index1 (); +            else +                return (size_type) item_->get_array_index2 ();          }      private: diff --git a/src/command.hpp b/src/command.hpp index 35aed0f..ff7b551 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -40,8 +40,8 @@ namespace zmq              own,              attach,              bind, -            activate_reader, -            activate_writer, +            activate_read, +            activate_write,              pipe_term,              pipe_term_ack,              term_req, @@ -79,8 +79,7 @@ namespace zmq              //  Sent from session to socket to establish pipe(s) between them.              //  Caller have used inc_seqnum beforehand sending the command.              struct { -                class reader_t *in_pipe; -                class writer_t *out_pipe; +                class pipe_t *pipe;                  unsigned char peer_identity_size;                  unsigned char *peer_identity;              } bind; @@ -88,13 +87,13 @@ namespace zmq              //  Sent by pipe writer to inform dormant pipe reader that there              //  are messages in the pipe.              struct { -            } activate_reader; +            } activate_read;              //  Sent by pipe reader to inform pipe writer about how many              //  messages it has read so far.              struct {                  uint64_t msgs_read; -            } activate_writer; +            } activate_write;              //  Sent by pipe reader to pipe writer to ask it to terminate              //  its end of the pipe. diff --git a/src/connect_session.cpp b/src/connect_session.cpp index c0951cb..9a29bf1 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -22,6 +22,7 @@  #include "zmq_connecter.hpp"  #include "pgm_sender.hpp"  #include "pgm_receiver.hpp" +#include "err.hpp"  zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,        class socket_base_t *socket_, const options_t &options_, diff --git a/src/dist.cpp b/src/dist.cpp index 7c15bfd..3afa196 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -39,10 +39,8 @@ zmq::dist_t::~dist_t ()      zmq_assert (pipes.empty ());  } -void zmq::dist_t::attach (writer_t *pipe_) +void zmq::dist_t::attach (pipe_t *pipe_)  { -    pipe_->set_event_sink (this); -      //  If we are in the middle of sending a message, we'll add new pipe      //  into the list of eligible pipes. Otherwise we add it to the list      //  of active pipes. @@ -74,7 +72,7 @@ void zmq::dist_t::terminate ()          pipes [i]->terminate ();  } -void zmq::dist_t::terminated (writer_t *pipe_) +void zmq::dist_t::terminated (pipe_t *pipe_)  {      //  Remove the pipe from the list; adjust number of active and/or      //  eligible pipes accordingly. @@ -88,7 +86,7 @@ void zmq::dist_t::terminated (writer_t *pipe_)          sink->unregister_term_ack ();  } -void zmq::dist_t::activated (writer_t *pipe_) +void zmq::dist_t::activated (pipe_t *pipe_)  {      //  Move the pipe from passive to eligible state.      pipes.swap (pipes.index (pipe_), eligible); @@ -153,7 +151,7 @@ bool zmq::dist_t::has_out ()      return true;  } -bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) +bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)  {      if (!pipe_->write (msg_)) {          pipes.swap (pipes.index (pipe_), active - 1); diff --git a/src/dist.hpp b/src/dist.hpp index fd522b9..c137332 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -31,33 +31,32 @@ namespace zmq      //  Class manages a set of outbound pipes. It sends each messages to      //  each of them. -    class dist_t : public i_writer_events +    class dist_t      {      public:          dist_t (class own_t *sink_);          ~dist_t (); -        void attach (writer_t *pipe_); +        void attach (class pipe_t *pipe_);          void terminate ();          int send (class msg_t *msg_, int flags_);          bool has_out (); -        //  i_writer_events interface implementation. -        void activated (writer_t *pipe_); -        void terminated (writer_t *pipe_); +        void activated (class pipe_t *pipe_); +        void terminated (class pipe_t *pipe_);      private:          //  Write the message to the pipe. Make the pipe inactive if writing          //  fails. In such a case false is returned. -        bool write (class writer_t *pipe_, class msg_t *msg_); +        bool write (class pipe_t *pipe_, class msg_t *msg_);          //  Put the message to all active pipes.          void distribute (class msg_t *msg_, int flags_);          //  List of outbound pipes. -        typedef array_t <class writer_t> pipes_t; +        typedef array_t <class pipe_t, 2> pipes_t;          pipes_t pipes;          //  Number of active pipes. All the active pipes are located at the @@ -38,10 +38,8 @@ zmq::fq_t::~fq_t ()      zmq_assert (pipes.empty ());  } -void zmq::fq_t::attach (reader_t *pipe_) +void zmq::fq_t::attach (pipe_t *pipe_)  { -    pipe_->set_event_sink (this); -      pipes.push_back (pipe_);      pipes.swap (active, pipes.size () - 1);      active++; @@ -53,7 +51,7 @@ void zmq::fq_t::attach (reader_t *pipe_)      }  } -void zmq::fq_t::terminated (reader_t *pipe_) +void zmq::fq_t::terminated (pipe_t *pipe_)  {      //  Make sure that we are not closing current pipe while      //  message is half-read. @@ -72,10 +70,6 @@ void zmq::fq_t::terminated (reader_t *pipe_)          sink->unregister_term_ack ();  } -void zmq::fq_t::delimited (reader_t *pipe_) -{ -} -  void zmq::fq_t::terminate ()  {      zmq_assert (!terminating); @@ -86,7 +80,7 @@ void zmq::fq_t::terminate ()          pipes [i]->terminate ();  } -void zmq::fq_t::activated (reader_t *pipe_) +void zmq::fq_t::activated (pipe_t *pipe_)  {      //  Move the pipe to the list of active pipes.      pipes.swap (pipes.index (pipe_), active); @@ -31,28 +31,26 @@ namespace zmq      //  Class manages a set of inbound pipes. On receive it performs fair      //  queueing (RFC970) so that senders gone berserk won't cause denial of      //  service for decent senders. -    class fq_t : public i_reader_events +    class fq_t      {      public:          fq_t (class own_t *sink_);          ~fq_t (); -        void attach (reader_t *pipe_); +        void attach (pipe_t *pipe_);          void terminate ();          int recv (msg_t *msg_, int flags_);          bool has_in (); -        //  i_reader_events implementation. -        void activated (reader_t *pipe_); -        void terminated (reader_t *pipe_); -        void delimited (reader_t *pipe_); +        void activated (pipe_t *pipe_); +        void terminated (pipe_t *pipe_);      private:          //  Inbound pipes. -        typedef array_t <reader_t> pipes_t; +        typedef array_t <pipe_t, 1> pipes_t;          pipes_t pipes;          //  Number of active pipes. All the active pipes are located at the @@ -39,10 +39,8 @@ zmq::lb_t::~lb_t ()      zmq_assert (pipes.empty ());  } -void zmq::lb_t::attach (writer_t *pipe_) +void zmq::lb_t::attach (pipe_t *pipe_)  { -    pipe_->set_event_sink (this); -      pipes.push_back (pipe_);      pipes.swap (active, pipes.size () - 1);      active++; @@ -63,7 +61,7 @@ void zmq::lb_t::terminate ()          pipes [i]->terminate ();  } -void zmq::lb_t::terminated (writer_t *pipe_) +void zmq::lb_t::terminated (pipe_t *pipe_)  {      pipes_t::size_type index = pipes.index (pipe_); @@ -85,7 +83,7 @@ void zmq::lb_t::terminated (writer_t *pipe_)          sink->unregister_term_ack ();  } -void zmq::lb_t::activated (writer_t *pipe_) +void zmq::lb_t::activated (pipe_t *pipe_)  {      //  Move the pipe to the list of active pipes.      pipes.swap (pipes.index (pipe_), active); @@ -27,28 +27,28 @@  namespace zmq  { -    //  Class manages a set of outbound pipes. On send it load balances +    //  This class manages a set of outbound pipes. On send it load balances      //  messages fairly among the pipes. -    class lb_t : public i_writer_events + +    class lb_t      {      public:          lb_t (class own_t *sink_);          ~lb_t (); -        void attach (writer_t *pipe_); +        void attach (pipe_t *pipe_);          void terminate ();          int send (msg_t *msg_, int flags_);          bool has_out (); -        //  i_writer_events interface implementation. -        void activated (writer_t *pipe_); -        void terminated (writer_t *pipe_); +        void activated (pipe_t *pipe_); +        void terminated (pipe_t *pipe_);      private:          //  List of outbound pipes. -        typedef array_t <class writer_t> pipes_t; +        typedef array_t <class pipe_t, 2> pipes_t;          pipes_t pipes;          //  Number of active pipes. All the active pipes are located at the diff --git a/src/object.cpp b/src/object.cpp index e2ca6d6..0a06d5f 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -59,12 +59,12 @@ void zmq::object_t::process_command (command_t &cmd_)  {      switch (cmd_.type) { -    case command_t::activate_reader: -        process_activate_reader (); +    case command_t::activate_read: +        process_activate_read ();          break; -    case command_t::activate_writer: -        process_activate_writer (cmd_.args.activate_writer.msgs_read); +    case command_t::activate_write: +        process_activate_write (cmd_.args.activate_write.msgs_read);          break;      case command_t::stop: @@ -90,8 +90,8 @@ void zmq::object_t::process_command (command_t &cmd_)          break;      case command_t::bind: -        process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, -            cmd_.args.bind.peer_identity ? blob_t (cmd_.args.bind.peer_identity, +        process_bind (cmd_.args.bind.pipe, cmd_.args.bind.peer_identity ? +            blob_t (cmd_.args.bind.peer_identity,              cmd_.args.bind.peer_identity_size) : blob_t ());          process_seqnum ();          break; @@ -236,8 +236,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,      send_command (cmd);  } -void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, -    writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_) +void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, +    const blob_t &peer_identity_, bool inc_seqnum_)  {      if (inc_seqnum_)          destination_->inc_seqnum (); @@ -248,8 +248,7 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,  #endif      cmd.destination = destination_;      cmd.type = command_t::bind; -    cmd.args.bind.in_pipe = in_pipe_; -    cmd.args.bind.out_pipe = out_pipe_; +    cmd.args.bind.pipe = pipe_;      if (peer_identity_.empty ()) {          cmd.args.bind.peer_identity_size = 0;          cmd.args.bind.peer_identity = NULL; @@ -267,18 +266,18 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,      send_command (cmd);  } -void zmq::object_t::send_activate_reader (reader_t *destination_) +void zmq::object_t::send_activate_read (pipe_t *destination_)  {      command_t cmd;  #if defined ZMQ_MAKE_VALGRIND_HAPPY      memset (&cmd, 0, sizeof (cmd));  #endif      cmd.destination = destination_; -    cmd.type = command_t::activate_reader; +    cmd.type = command_t::activate_read;      send_command (cmd);  } -void zmq::object_t::send_activate_writer (writer_t *destination_, +void zmq::object_t::send_activate_write (pipe_t *destination_,      uint64_t msgs_read_)  {      command_t cmd; @@ -286,12 +285,12 @@ void zmq::object_t::send_activate_writer (writer_t *destination_,      memset (&cmd, 0, sizeof (cmd));  #endif      cmd.destination = destination_; -    cmd.type = command_t::activate_writer; -    cmd.args.activate_writer.msgs_read = msgs_read_; +    cmd.type = command_t::activate_write; +    cmd.args.activate_write.msgs_read = msgs_read_;      send_command (cmd);  } -void zmq::object_t::send_pipe_term (writer_t *destination_) +void zmq::object_t::send_pipe_term (pipe_t *destination_)  {      command_t cmd;  #if defined ZMQ_MAKE_VALGRIND_HAPPY @@ -302,7 +301,7 @@ void zmq::object_t::send_pipe_term (writer_t *destination_)      send_command (cmd);  } -void zmq::object_t::send_pipe_term_ack (reader_t *destination_) +void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)  {      command_t cmd;  #if defined ZMQ_MAKE_VALGRIND_HAPPY @@ -404,18 +403,17 @@ void zmq::object_t::process_attach (i_engine *engine_,      zmq_assert (false);  } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, -    const blob_t &peer_identity_) +void zmq::object_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_)  {      zmq_assert (false);  } -void zmq::object_t::process_activate_reader () +void zmq::object_t::process_activate_read ()  {      zmq_assert (false);  } -void zmq::object_t::process_activate_writer (uint64_t msgs_read_) +void zmq::object_t::process_activate_write (uint64_t msgs_read_)  {      zmq_assert (false);  } diff --git a/src/object.hpp b/src/object.hpp index 0f5e61b..0f47670 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -66,14 +66,13 @@ namespace zmq          void send_attach (class session_t *destination_,               struct i_engine *engine_, const blob_t &peer_identity_,               bool inc_seqnum_ = true); -        void send_bind (class own_t *destination_, -             class reader_t *in_pipe_, class writer_t *out_pipe_, +        void send_bind (class own_t *destination_, class pipe_t *pipe_,               const blob_t &peer_identity_, bool inc_seqnum_ = true); -        void send_activate_reader (class reader_t *destination_); -        void send_activate_writer (class writer_t *destination_, +        void send_activate_read (class pipe_t *destination_); +        void send_activate_write (class pipe_t *destination_,               uint64_t msgs_read_); -        void send_pipe_term (class writer_t *destination_); -        void send_pipe_term_ack (class reader_t *destination_); +        void send_pipe_term (class pipe_t *destination_); +        void send_pipe_term_ack (class pipe_t *destination_);          void send_term_req (class own_t *destination_,              class own_t *object_);          void send_term (class own_t *destination_, int linger_); @@ -89,10 +88,10 @@ namespace zmq          virtual void process_own (class own_t *object_);          virtual void process_attach (struct i_engine *engine_,              const blob_t &peer_identity_); -        virtual void process_bind (class reader_t *in_pipe_, -            class writer_t *out_pipe_, const blob_t &peer_identity_); -        virtual void process_activate_reader (); -        virtual void process_activate_writer (uint64_t msgs_read_); +        virtual void process_bind (class pipe_t *pipe_, +            const blob_t &peer_identity_); +        virtual void process_activate_read (); +        virtual void process_activate_write (uint64_t msgs_read_);          virtual void process_pipe_term ();          virtual void process_pipe_term_ack ();          virtual void process_term_req (class own_t *object_); diff --git a/src/options.cpp b/src/options.cpp index 897e0f5..271ebdb 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -38,8 +38,6 @@ zmq::options_t::options_t () :      reconnect_ivl_max (0),      backlog (100),      maxmsgsize (-1), -    requires_in (false), -    requires_out (false),      immediate_connect (true)  {  } diff --git a/src/options.hpp b/src/options.hpp index 53d0197..fd39a74 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -75,11 +75,6 @@ namespace zmq          //  Maximal size of message to handle.          int64_t maxmsgsize; -        //  These options are never set by the user directly. Instead they are -        //  provided by the specific socket type. -        bool requires_in; -        bool requires_out; -          //  If true, when connecting, pipes are created immediately without          //  waiting for the connection to be established. That way the socket          //  is not aware of the peer's identity, however, it is able to send diff --git a/src/own.cpp b/src/own.cpp index 4cbfdd6..cdf20a4 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -173,6 +173,7 @@ void zmq::own_t::process_term (int linger_)  void zmq::own_t::register_term_acks (int count_)  {      term_acks += count_; +    printf ("reg %d acks (%p, %d)\n", count_, (void*) this, term_acks);  }  void zmq::own_t::unregister_term_ack () @@ -180,6 +181,8 @@ void zmq::own_t::unregister_term_ack ()      zmq_assert (term_acks > 0);      term_acks--; +    printf ("unreg 1 acks (%p, %d)\n", (void*) this, term_acks); +      //  This may be a last ack we are waiting for before termination...      check_term_acks ();   } diff --git a/src/pair.cpp b/src/pair.cpp index d877b54..93a4327 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -25,111 +25,72 @@  zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :      socket_base_t (parent_, tid_), -    inpipe (NULL), -    outpipe (NULL), -    inpipe_alive (false), -    outpipe_alive (false), +    pipe (NULL),      terminating (false)  {      options.type = ZMQ_PAIR; -    options.requires_in = true; -    options.requires_out = true;  }  zmq::pair_t::~pair_t ()  { -    zmq_assert (!inpipe); -    zmq_assert (!outpipe); +    zmq_assert (!pipe);  } -void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, -    const blob_t &peer_identity_) +void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)  { -    zmq_assert (!inpipe && !outpipe); +    zmq_assert (!pipe); -    inpipe = inpipe_; -    inpipe_alive = true; -    inpipe->set_event_sink (this); - -    outpipe = outpipe_; -    outpipe_alive = true; -    outpipe->set_event_sink (this); +    pipe = pipe_; +    pipe->set_event_sink (this);      if (terminating) { -        register_term_acks (2); -        inpipe_->terminate (); -        outpipe_->terminate (); +        register_term_acks (1); +        pipe_->terminate ();      }  } -void zmq::pair_t::terminated (reader_t *pipe_) -{ -    zmq_assert (pipe_ == inpipe); -    inpipe = NULL; -    inpipe_alive = false; - -    if (terminating) -        unregister_term_ack (); -} - -void zmq::pair_t::terminated (writer_t *pipe_) +void zmq::pair_t::terminated (pipe_t *pipe_)  { -    zmq_assert (pipe_ == outpipe); -    outpipe = NULL; -    outpipe_alive = false; +    zmq_assert (pipe_ == pipe); +    pipe = NULL;      if (terminating)          unregister_term_ack ();  } -void  zmq::pair_t::delimited (reader_t *pipe_) -{ -} -  void zmq::pair_t::process_term (int linger_)  {      terminating = true; -    if (inpipe) { +    if (pipe) {          register_term_acks (1); -        inpipe->terminate (); -    } - -    if (outpipe) { -        register_term_acks (1); -        outpipe->terminate (); +        pipe->terminate ();      }      socket_base_t::process_term (linger_);  } -void zmq::pair_t::activated (class reader_t *pipe_) +void zmq::pair_t::read_activated (pipe_t *pipe_)  { -    zmq_assert (!inpipe_alive); -    inpipe_alive = true; +    //  There's just one pipe. No lists of active and inactive pipes. +    //  There's nothing to do here.  } -void zmq::pair_t::activated (class writer_t *pipe_) +void zmq::pair_t::write_activated (pipe_t *pipe_)  { -    zmq_assert (!outpipe_alive); -    outpipe_alive = true; +    //  There's just one pipe. No lists of active and inactive pipes. +    //  There's nothing to do here.  }  int zmq::pair_t::xsend (msg_t *msg_, int flags_)  { -    if (outpipe == NULL || !outpipe_alive) { -        errno = EAGAIN; -        return -1; -    } - -    if (!outpipe->write (msg_)) { -        outpipe_alive = false; +    if (!pipe || !pipe->write (msg_)) {          errno = EAGAIN;          return -1;      }      if (!(flags_ & ZMQ_SNDMORE)) -        outpipe->flush (); +        pipe->flush ();      //  Detach the original message from the data buffer.      int rc = msg_->init (); @@ -144,14 +105,12 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_)      int rc = msg_->close ();      errno_assert (rc == 0); -    if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) { - -        //  No message is available. -        inpipe_alive = false; +    if (!pipe || !pipe->read (msg_)) {          //  Initialise the output parameter to be a 0-byte message.          rc = msg_->init ();          errno_assert (rc == 0); +          errno = EAGAIN;          return -1;      } @@ -160,24 +119,23 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_)  bool zmq::pair_t::xhas_in ()  { -    if (!inpipe || !inpipe_alive) +    if (!pipe)          return false; -    inpipe_alive = inpipe->check_read (); -    return inpipe_alive; +    return pipe->check_read ();  }  bool zmq::pair_t::xhas_out ()  { -    if (!outpipe || !outpipe_alive) +    if (!pipe)          return false; | 
