diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-05-22 17:26:53 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-05-22 17:26:53 +0200 |
commit | acf0b0e515515e51ad32ba7a2d147ce703579478 (patch) | |
tree | d2032009cf46c23aa0f677c2216914f718ab968a /src | |
parent | 9e6b39925603f9e64db08c469bd628d7ef9465de (diff) |
Introduces bi-directional pipes
So far, there was a pair of unidirectional pipes between a socket
and a session (or an inproc peer). This resulted in complex
problems with half-closed states and tracking which inpipe
corresponds to which outpipe.
This patch doesn't add any functionality in itself, but is
essential for further work on features like subscription
forwarding.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/array.hpp | 79 | ||||
-rw-r--r-- | src/command.hpp | 11 | ||||
-rw-r--r-- | src/connect_session.cpp | 1 | ||||
-rw-r--r-- | src/dist.cpp | 10 | ||||
-rw-r--r-- | src/dist.hpp | 13 | ||||
-rw-r--r-- | src/fq.cpp | 12 | ||||
-rw-r--r-- | src/fq.hpp | 12 | ||||
-rw-r--r-- | src/lb.cpp | 8 | ||||
-rw-r--r-- | src/lb.hpp | 14 | ||||
-rw-r--r-- | src/object.cpp | 40 | ||||
-rw-r--r-- | src/object.hpp | 19 | ||||
-rw-r--r-- | src/options.cpp | 2 | ||||
-rw-r--r-- | src/options.hpp | 5 | ||||
-rw-r--r-- | src/own.cpp | 3 | ||||
-rw-r--r-- | src/pair.cpp | 98 | ||||
-rw-r--r-- | src/pair.hpp | 24 | ||||
-rw-r--r-- | src/pipe.cpp | 335 | ||||
-rw-r--r-- | src/pipe.hpp | 201 | ||||
-rw-r--r-- | src/pull.cpp | 26 | ||||
-rw-r--r-- | src/pull.hpp | 13 | ||||
-rw-r--r-- | src/push.cpp | 26 | ||||
-rw-r--r-- | src/push.hpp | 13 | ||||
-rw-r--r-- | src/session.cpp | 167 | ||||
-rw-r--r-- | src/session.hpp | 29 | ||||
-rw-r--r-- | src/socket_base.cpp | 73 | ||||
-rw-r--r-- | src/socket_base.hpp | 12 | ||||
-rw-r--r-- | src/xpub.cpp | 26 | ||||
-rw-r--r-- | src/xpub.hpp | 12 | ||||
-rw-r--r-- | src/xrep.cpp | 90 | ||||
-rw-r--r-- | src/xrep.hpp | 24 | ||||
-rw-r--r-- | src/xreq.cpp | 28 | ||||
-rw-r--r-- | src/xreq.hpp | 13 | ||||
-rw-r--r-- | src/xsub.cpp | 26 | ||||
-rw-r--r-- | src/xsub.hpp | 13 |
34 files changed, 709 insertions, 769 deletions
diff --git a/src/array.hpp b/src/array.hpp index 1d18e48..e7b5266 100644 --- a/src/array.hpp +++ b/src/array.hpp @@ -28,14 +28,17 @@ namespace zmq { // Base class for objects stored in the array. Note that each object can - // be stored in at most one array. + // be stored in at most two arrays. This is needed specifically in the + // case where single pipe object is stored both in array of inbound pipes + // and in the array of outbound pipes. class array_item_t { public: inline array_item_t () : - array_index (-1) + array_index1 (-1), + array_index2 (-1) { } @@ -45,19 +48,30 @@ namespace zmq { } - inline void set_array_index (int index_) + inline void set_array_index1 (int index_) { - array_index = index_; + array_index1 = index_; } - inline int get_array_index () + inline int get_array_index1 () { - return array_index; + return array_index1; + } + + inline void set_array_index2 (int index_) + { + array_index2 = index_; + } + + inline int get_array_index2 () + { + return array_index2; } private: - int array_index; + int array_index1; + int array_index2; array_item_t (const array_item_t&); const array_item_t &operator = (const array_item_t&); @@ -65,9 +79,11 @@ namespace zmq // Fast array implementation with O(1) access to item, insertion and // removal. Array stores pointers rather than objects. The objects have - // to be derived from array_item_t class. + // to be derived from array_item_t class, thus they can be stored in + // two arrays. Template parameter N specifies which index in array_item_t + // to use. - template <typename T> class array_t + template <typename T, int N = 1> class array_t { public: @@ -98,28 +114,48 @@ namespace zmq inline void push_back (T *item_) { - if (item_) - item_->set_array_index ((int) items.size ()); + if (item_) { + if (N == 1) + item_->set_array_index1 ((int) items.size ()); + else + item_->set_array_index2 ((int) items.size ()); + } items.push_back (item_); } - inline void erase (T *item_) { - erase (item_->get_array_index ()); + inline void erase (T *item_) + { + if (N == 1) + erase (item_->get_array_index1 ()); + else + erase (item_->get_array_index2 ()); } inline void erase (size_type index_) { - if (items.back ()) - items.back ()->set_array_index ((int) index_); + if (items.back ()) { + if (N == 1) + items.back ()->set_array_index1 ((int) index_); + else + items.back ()->set_array_index2 ((int) index_); + } items [index_] = items.back (); items.pop_back (); } inline void swap (size_type index1_, size_type index2_) { - if (items [index1_]) - items [index1_]->set_array_index ((int) index2_); - if (items [index2_]) - items [index2_]->set_array_index ((int) index1_); + if (N == 1) { + if (items [index1_]) + items [index1_]->set_array_index1 ((int) index2_); + if (items [index2_]) + items [index2_]->set_array_index1 ((int) index1_); + } + else { + if (items [index1_]) + items [index1_]->set_array_index2 ((int) index2_); + if (items [index2_]) + items [index2_]->set_array_index2 ((int) index1_); + } std::swap (items [index1_], items [index2_]); } @@ -130,7 +166,10 @@ namespace zmq inline size_type index (T *item_) { - return (size_type) item_->get_array_index (); + if (N == 1) + return (size_type) item_->get_array_index1 (); + else + return (size_type) item_->get_array_index2 (); } private: diff --git a/src/command.hpp b/src/command.hpp index 35aed0f..ff7b551 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -40,8 +40,8 @@ namespace zmq own, attach, bind, - activate_reader, - activate_writer, + activate_read, + activate_write, pipe_term, pipe_term_ack, term_req, @@ -79,8 +79,7 @@ namespace zmq // Sent from session to socket to establish pipe(s) between them. // Caller have used inc_seqnum beforehand sending the command. struct { - class reader_t *in_pipe; - class writer_t *out_pipe; + class pipe_t *pipe; unsigned char peer_identity_size; unsigned char *peer_identity; } bind; @@ -88,13 +87,13 @@ namespace zmq // Sent by pipe writer to inform dormant pipe reader that there // are messages in the pipe. struct { - } activate_reader; + } activate_read; // Sent by pipe reader to inform pipe writer about how many // messages it has read so far. struct { uint64_t msgs_read; - } activate_writer; + } activate_write; // Sent by pipe reader to pipe writer to ask it to terminate // its end of the pipe. diff --git a/src/connect_session.cpp b/src/connect_session.cpp index c0951cb..9a29bf1 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -22,6 +22,7 @@ #include "zmq_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" +#include "err.hpp" zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, class socket_base_t *socket_, const options_t &options_, diff --git a/src/dist.cpp b/src/dist.cpp index 7c15bfd..3afa196 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -39,10 +39,8 @@ zmq::dist_t::~dist_t () zmq_assert (pipes.empty ()); } -void zmq::dist_t::attach (writer_t *pipe_) +void zmq::dist_t::attach (pipe_t *pipe_) { - pipe_->set_event_sink (this); - // If we are in the middle of sending a message, we'll add new pipe // into the list of eligible pipes. Otherwise we add it to the list // of active pipes. @@ -74,7 +72,7 @@ void zmq::dist_t::terminate () pipes [i]->terminate (); } -void zmq::dist_t::terminated (writer_t *pipe_) +void zmq::dist_t::terminated (pipe_t *pipe_) { // Remove the pipe from the list; adjust number of active and/or // eligible pipes accordingly. @@ -88,7 +86,7 @@ void zmq::dist_t::terminated (writer_t *pipe_) sink->unregister_term_ack (); } -void zmq::dist_t::activated (writer_t *pipe_) +void zmq::dist_t::activated (pipe_t *pipe_) { // Move the pipe from passive to eligible state. pipes.swap (pipes.index (pipe_), eligible); @@ -153,7 +151,7 @@ bool zmq::dist_t::has_out () return true; } -bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) +bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { pipes.swap (pipes.index (pipe_), active - 1); diff --git a/src/dist.hpp b/src/dist.hpp index fd522b9..c137332 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -31,33 +31,32 @@ namespace zmq // Class manages a set of outbound pipes. It sends each messages to // each of them. - class dist_t : public i_writer_events + class dist_t { public: dist_t (class own_t *sink_); ~dist_t (); - void attach (writer_t *pipe_); + void attach (class pipe_t *pipe_); void terminate (); int send (class msg_t *msg_, int flags_); bool has_out (); - // i_writer_events interface implementation. - void activated (writer_t *pipe_); - void terminated (writer_t *pipe_); + void activated (class pipe_t *pipe_); + void terminated (class pipe_t *pipe_); private: // Write the message to the pipe. Make the pipe inactive if writing // fails. In such a case false is returned. - bool write (class writer_t *pipe_, class msg_t *msg_); + bool write (class pipe_t *pipe_, class msg_t *msg_); // Put the message to all active pipes. void distribute (class msg_t *msg_, int flags_); // List of outbound pipes. - typedef array_t <class writer_t> pipes_t; + typedef array_t <class pipe_t, 2> pipes_t; pipes_t pipes; // Number of active pipes. All the active pipes are located at the @@ -38,10 +38,8 @@ zmq::fq_t::~fq_t () zmq_assert (pipes.empty ()); } -void zmq::fq_t::attach (reader_t *pipe_) +void zmq::fq_t::attach (pipe_t *pipe_) { - pipe_->set_event_sink (this); - pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; @@ -53,7 +51,7 @@ void zmq::fq_t::attach (reader_t *pipe_) } } -void zmq::fq_t::terminated (reader_t *pipe_) +void zmq::fq_t::terminated (pipe_t *pipe_) { // Make sure that we are not closing current pipe while // message is half-read. @@ -72,10 +70,6 @@ void zmq::fq_t::terminated (reader_t *pipe_) sink->unregister_term_ack (); } -void zmq::fq_t::delimited (reader_t *pipe_) -{ -} - void zmq::fq_t::terminate () { zmq_assert (!terminating); @@ -86,7 +80,7 @@ void zmq::fq_t::terminate () pipes [i]->terminate (); } -void zmq::fq_t::activated (reader_t *pipe_) +void zmq::fq_t::activated (pipe_t *pipe_) { // Move the pipe to the list of active pipes. pipes.swap (pipes.index (pipe_), active); @@ -31,28 +31,26 @@ namespace zmq // Class manages a set of inbound pipes. On receive it performs fair // queueing (RFC970) so that senders gone berserk won't cause denial of // service for decent senders. - class fq_t : public i_reader_events + class fq_t { public: fq_t (class own_t *sink_); ~fq_t (); - void attach (reader_t *pipe_); + void attach (pipe_t *pipe_); void terminate (); int recv (msg_t *msg_, int flags_); bool has_in (); - // i_reader_events implementation. - void activated (reader_t *pipe_); - void terminated (reader_t *pipe_); - void delimited (reader_t *pipe_); + void activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); private: // Inbound pipes. - typedef array_t <reader_t> pipes_t; + typedef array_t <pipe_t, 1> pipes_t; pipes_t pipes; // Number of active pipes. All the active pipes are located at the @@ -39,10 +39,8 @@ zmq::lb_t::~lb_t () zmq_assert (pipes.empty ()); } -void zmq::lb_t::attach (writer_t *pipe_) +void zmq::lb_t::attach (pipe_t *pipe_) { - pipe_->set_event_sink (this); - pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; @@ -63,7 +61,7 @@ void zmq::lb_t::terminate () pipes [i]->terminate (); } -void zmq::lb_t::terminated (writer_t *pipe_) +void zmq::lb_t::terminated (pipe_t *pipe_) { pipes_t::size_type index = pipes.index (pipe_); @@ -85,7 +83,7 @@ void zmq::lb_t::terminated (writer_t *pipe_) sink->unregister_term_ack (); } -void zmq::lb_t::activated (writer_t *pipe_) +void zmq::lb_t::activated (pipe_t *pipe_) { // Move the pipe to the list of active pipes. pipes.swap (pipes.index (pipe_), active); @@ -27,28 +27,28 @@ namespace zmq { - // Class manages a set of outbound pipes. On send it load balances + // This class manages a set of outbound pipes. On send it load balances // messages fairly among the pipes. - class lb_t : public i_writer_events + + class lb_t { public: lb_t (class own_t *sink_); ~lb_t (); - void attach (writer_t *pipe_); + void attach (pipe_t *pipe_); void terminate (); int send (msg_t *msg_, int flags_); bool has_out (); - // i_writer_events interface implementation. - void activated (writer_t *pipe_); - void terminated (writer_t *pipe_); + void activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); private: // List of outbound pipes. - typedef array_t <class writer_t> pipes_t; + typedef array_t <class pipe_t, 2> pipes_t; pipes_t pipes; // Number of active pipes. All the active pipes are located at the diff --git a/src/object.cpp b/src/object.cpp index e2ca6d6..0a06d5f 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -59,12 +59,12 @@ void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { - case command_t::activate_reader: - process_activate_reader (); + case command_t::activate_read: + process_activate_read (); break; - case command_t::activate_writer: - process_activate_writer (cmd_.args.activate_writer.msgs_read); + case command_t::activate_write: + process_activate_write (cmd_.args.activate_write.msgs_read); break; case command_t::stop: @@ -90,8 +90,8 @@ void zmq::object_t::process_command (command_t &cmd_) break; case command_t::bind: - process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, - cmd_.args.bind.peer_identity ? blob_t (cmd_.args.bind.peer_identity, + process_bind (cmd_.args.bind.pipe, cmd_.args.bind.peer_identity ? + blob_t (cmd_.args.bind.peer_identity, cmd_.args.bind.peer_identity_size) : blob_t ()); process_seqnum (); break; @@ -236,8 +236,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, send_command (cmd); } -void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, - writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_) +void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, + const blob_t &peer_identity_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -248,8 +248,7 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, #endif cmd.destination = destination_; cmd.type = command_t::bind; - cmd.args.bind.in_pipe = in_pipe_; - cmd.args.bind.out_pipe = out_pipe_; + cmd.args.bind.pipe = pipe_; if (peer_identity_.empty ()) { cmd.args.bind.peer_identity_size = 0; cmd.args.bind.peer_identity = NULL; @@ -267,18 +266,18 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, send_command (cmd); } -void zmq::object_t::send_activate_reader (reader_t *destination_) +void zmq::object_t::send_activate_read (pipe_t *destination_) { command_t cmd; #if defined ZMQ_MAKE_VALGRIND_HAPPY memset (&cmd, 0, sizeof (cmd)); #endif cmd.destination = destination_; - cmd.type = command_t::activate_reader; + cmd.type = command_t::activate_read; send_command (cmd); } -void zmq::object_t::send_activate_writer (writer_t *destination_, +void zmq::object_t::send_activate_write (pipe_t *destination_, uint64_t msgs_read_) { command_t cmd; @@ -286,12 +285,12 @@ void zmq::object_t::send_activate_writer (writer_t *destination_, memset (&cmd, 0, sizeof (cmd)); #endif cmd.destination = destination_; - cmd.type = command_t::activate_writer; - cmd.args.activate_writer.msgs_read = msgs_read_; + cmd.type = command_t::activate_write; + cmd.args.activate_write.msgs_read = msgs_read_; send_command (cmd); } -void zmq::object_t::send_pipe_term (writer_t *destination_) +void zmq::object_t::send_pipe_term (pipe_t *destination_) { command_t cmd; #if defined ZMQ_MAKE_VALGRIND_HAPPY @@ -302,7 +301,7 @@ void zmq::object_t::send_pipe_term (writer_t *destination_) send_command (cmd); } -void zmq::object_t::send_pipe_term_ack (reader_t *destination_) +void zmq::object_t::send_pipe_term_ack (pipe_t *destination_) { command_t cmd; #if defined ZMQ_MAKE_VALGRIND_HAPPY @@ -404,18 +403,17 @@ void zmq::object_t::process_attach (i_engine *engine_, zmq_assert (false); } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, - const blob_t &peer_identity_) +void zmq::object_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (false); } -void zmq::object_t::process_activate_reader () +void zmq::object_t::process_activate_read () { zmq_assert (false); } -void zmq::object_t::process_activate_writer (uint64_t msgs_read_) +void zmq::object_t::process_activate_write (uint64_t msgs_read_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index 0f5e61b..0f47670 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -66,14 +66,13 @@ namespace zmq void send_attach (class session_t *destination_, struct i_engine *engine_, const blob_t &peer_identity_, bool inc_seqnum_ = true); - void send_bind (class own_t *destination_, - class reader_t *in_pipe_, class writer_t *out_pipe_, + void send_bind (class own_t *destination_, class pipe_t *pipe_, const blob_t &peer_identity_, bool inc_seqnum_ = true); - void send_activate_reader (class reader_t *destination_); - void send_activate_writer (class writer_t *destination_, + void send_activate_read (class pipe_t *destination_); + void send_activate_write (class pipe_t *destination_, uint64_t msgs_read_); - void send_pipe_term (class writer_t *destination_); - void send_pipe_term_ack (class reader_t *destination_); + void send_pipe_term (class pipe_t *destination_); + void send_pipe_term_ack (class pipe_t *destination_); void send_term_req (class own_t *destination_, class own_t *object_); void send_term (class own_t *destination_, int linger_); @@ -89,10 +88,10 @@ namespace zmq virtual void process_own (class own_t *object_); virtual void process_attach (struct i_engine *engine_, const blob_t &peer_identity_); - virtual void process_bind (class reader_t *in_pipe_, - class writer_t *out_pipe_, const blob_t &peer_identity_); - virtual void process_activate_reader (); - virtual void process_activate_writer (uint64_t msgs_read_); + virtual void process_bind (class pipe_t *pipe_, + const blob_t &peer_identity_); + virtual void process_activate_read (); + virtual void process_activate_write (uint64_t msgs_read_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); virtual void process_term_req (class own_t *object_); diff --git a/src/options.cpp b/src/options.cpp index 897e0f5..271ebdb 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -38,8 +38,6 @@ zmq::options_t::options_t () : reconnect_ivl_max (0), backlog (100), maxmsgsize (-1), - requires_in (false), - requires_out (false), immediate_connect (true) { } diff --git a/src/options.hpp b/src/options.hpp index 53d0197..fd39a74 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -75,11 +75,6 @@ namespace zmq // Maximal size of message to handle. int64_t maxmsgsize; - // These options are never set by the user directly. Instead they are - // provided by the specific socket type. - bool requires_in; - bool requires_out; - // If true, when connecting, pipes are created immediately without // waiting for the connection to be established. That way the socket // is not aware of the peer's identity, however, it is able to send diff --git a/src/own.cpp b/src/own.cpp index 4cbfdd6..cdf20a4 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -173,6 +173,7 @@ void zmq::own_t::process_term (int linger_) void zmq::own_t::register_term_acks (int count_) { term_acks += count_; + printf ("reg %d acks (%p, %d)\n", count_, (void*) this, term_acks); } void zmq::own_t::unregister_term_ack () @@ -180,6 +181,8 @@ void zmq::own_t::unregister_term_ack () zmq_assert (term_acks > 0); term_acks--; + printf ("unreg 1 acks (%p, %d)\n", (void*) this, term_acks); + // This may be a last ack we are waiting for before termination... check_term_acks (); } diff --git a/src/pair.cpp b/src/pair.cpp index d877b54..93a4327 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -25,111 +25,72 @@ zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), - inpipe (NULL), - outpipe (NULL), - inpipe_alive (false), - outpipe_alive (false), + pipe (NULL), terminating (false) { options.type = ZMQ_PAIR; - options.requires_in = true; - options.requires_out = true; } zmq::pair_t::~pair_t () { - zmq_assert (!inpipe); - zmq_assert (!outpipe); + zmq_assert (!pipe); } -void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, - const blob_t &peer_identity_) +void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - zmq_assert (!inpipe && !outpipe); + zmq_assert (!pipe); - inpipe = inpipe_; - inpipe_alive = true; - inpipe->set_event_sink (this); - - outpipe = outpipe_; - outpipe_alive = true; - outpipe->set_event_sink (this); + pipe = pipe_; + pipe->set_event_sink (this); if (terminating) { - register_term_acks (2); - inpipe_->terminate (); - outpipe_->terminate (); + register_term_acks (1); + pipe_->terminate (); } } -void zmq::pair_t::terminated (reader_t *pipe_) -{ - zmq_assert (pipe_ == inpipe); - inpipe = NULL; - inpipe_alive = false; - - if (terminating) - unregister_term_ack (); -} - -void zmq::pair_t::terminated (writer_t *pipe_) +void zmq::pair_t::terminated (pipe_t *pipe_) { - zmq_assert (pipe_ == outpipe); - outpipe = NULL; - outpipe_alive = false; + zmq_assert (pipe_ == pipe); + pipe = NULL; if (terminating) unregister_term_ack (); } -void zmq::pair_t::delimited (reader_t *pipe_) -{ -} - void zmq::pair_t::process_term (int linger_) { terminating = true; - if (inpipe) { + if (pipe) { register_term_acks (1); - inpipe->terminate (); - } - - if (outpipe) { - register_term_acks (1); - outpipe->terminate (); + pipe->terminate (); } socket_base_t::process_term (linger_); } -void zmq::pair_t::activated (class reader_t *pipe_) +void zmq::pair_t::read_activated (pipe_t *pipe_) { - zmq_assert (!inpipe_alive); - inpipe_alive = true; + // There's just one pipe. No lists of active and inactive pipes. + // There's nothing to do here. } -void zmq::pair_t::activated (class writer_t *pipe_) +void zmq::pair_t::write_activated (pipe_t *pipe_) { - zmq_assert (!outpipe_alive); - outpipe_alive = true; + // There's just one pipe. No lists of active and inactive pipes. + // There's nothing to do here. } int zmq::pair_t::xsend (msg_t *msg_, int flags_) { - if (outpipe == NULL || !outpipe_alive) { - errno = EAGAIN; - return -1; - } - - if (!outpipe->write (msg_)) { - outpipe_alive = false; + if (!pipe || !pipe->write (msg_)) { errno = EAGAIN; return -1; } if (!(flags_ & ZMQ_SNDMORE)) - outpipe->flush (); + pipe->flush (); // Detach the original message from the data buffer. int rc = msg_->init (); @@ -144,14 +105,12 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_) int rc = msg_->close (); errno_assert (rc == 0); - if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) { - - // No message is available. - inpipe_alive = false; + if (!pipe || !pipe->read (msg_)) { // Initialise the output parameter to be a 0-byte message. rc = msg_->init (); errno_assert (rc == 0); + errno = EAGAIN; return -1; } @@ -160,24 +119,23 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_) bool zmq::pair_t::xhas_in () { - if (!inpipe || !inpipe_alive) + if (!pipe) return false; - inpipe_alive = inpipe->check_read (); - retu |