From acf0b0e515515e51ad32ba7a2d147ce703579478 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 22 May 2011 17:26:53 +0200 Subject: Introduces bi-directional pipes So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik --- src/array.hpp | 79 +++++++++--- src/command.hpp | 11 +- src/connect_session.cpp | 1 + src/dist.cpp | 10 +- src/dist.hpp | 13 +- src/fq.cpp | 12 +- src/fq.hpp | 12 +- src/lb.cpp | 8 +- src/lb.hpp | 14 +- src/object.cpp | 40 +++--- src/object.hpp | 19 ++- src/options.cpp | 2 - src/options.hpp | 5 - src/own.cpp | 3 + src/pair.cpp | 98 ++++---------- src/pair.hpp | 24 +--- src/pipe.cpp | 335 +++++++++++++++++++++++------------------------- src/pipe.hpp | 201 ++++++++++++----------------- src/pull.cpp | 26 +++- src/pull.hpp | 13 +- src/push.cpp | 26 +++- src/push.hpp | 13 +- src/session.cpp | 167 ++++++++++-------------- src/session.hpp | 29 ++--- src/socket_base.cpp | 73 +++++------ src/socket_base.hpp | 12 +- src/xpub.cpp | 26 +++- src/xpub.hpp | 12 +- src/xrep.cpp | 90 +++++-------- src/xrep.hpp | 24 ++-- src/xreq.cpp | 28 +++- src/xreq.hpp | 13 +- src/xsub.cpp | 26 +++- src/xsub.hpp | 13 +- 34 files changed, 709 insertions(+), 769 deletions(-) diff --git a/src/array.hpp b/src/array.hpp index 1d18e48..e7b5266 100644 --- a/src/array.hpp +++ b/src/array.hpp @@ -28,14 +28,17 @@ namespace zmq { // Base class for objects stored in the array. Note that each object can - // be stored in at most one array. + // be stored in at most two arrays. This is needed specifically in the + // case where single pipe object is stored both in array of inbound pipes + // and in the array of outbound pipes. class array_item_t { public: inline array_item_t () : - array_index (-1) + array_index1 (-1), + array_index2 (-1) { } @@ -45,19 +48,30 @@ namespace zmq { } - inline void set_array_index (int index_) + inline void set_array_index1 (int index_) { - array_index = index_; + array_index1 = index_; } - inline int get_array_index () + inline int get_array_index1 () { - return array_index; + return array_index1; + } + + inline void set_array_index2 (int index_) + { + array_index2 = index_; + } + + inline int get_array_index2 () + { + return array_index2; } private: - int array_index; + int array_index1; + int array_index2; array_item_t (const array_item_t&); const array_item_t &operator = (const array_item_t&); @@ -65,9 +79,11 @@ namespace zmq // Fast array implementation with O(1) access to item, insertion and // removal. Array stores pointers rather than objects. The objects have - // to be derived from array_item_t class. + // to be derived from array_item_t class, thus they can be stored in + // two arrays. Template parameter N specifies which index in array_item_t + // to use. - template class array_t + template class array_t { public: @@ -98,28 +114,48 @@ namespace zmq inline void push_back (T *item_) { - if (item_) - item_->set_array_index ((int) items.size ()); + if (item_) { + if (N == 1) + item_->set_array_index1 ((int) items.size ()); + else + item_->set_array_index2 ((int) items.size ()); + } items.push_back (item_); } - inline void erase (T *item_) { - erase (item_->get_array_index ()); + inline void erase (T *item_) + { + if (N == 1) + erase (item_->get_array_index1 ()); + else + erase (item_->get_array_index2 ()); } inline void erase (size_type index_) { - if (items.back ()) - items.back ()->set_array_index ((int) index_); + if (items.back ()) { + if (N == 1) + items.back ()->set_array_index1 ((int) index_); + else + items.back ()->set_array_index2 ((int) index_); + } items [index_] = items.back (); items.pop_back (); } inline void swap (size_type index1_, size_type index2_) { - if (items [index1_]) - items [index1_]->set_array_index ((int) index2_); - if (items [index2_]) - items [index2_]->set_array_index ((int) index1_); + if (N == 1) { + if (items [index1_]) + items [index1_]->set_array_index1 ((int) index2_); + if (items [index2_]) + items [index2_]->set_array_index1 ((int) index1_); + } + else { + if (items [index1_]) + items [index1_]->set_array_index2 ((int) index2_); + if (items [index2_]) + items [index2_]->set_array_index2 ((int) index1_); + } std::swap (items [index1_], items [index2_]); } @@ -130,7 +166,10 @@ namespace zmq inline size_type index (T *item_) { - return (size_type) item_->get_array_index (); + if (N == 1) + return (size_type) item_->get_array_index1 (); + else + return (size_type) item_->get_array_index2 (); } private: diff --git a/src/command.hpp b/src/command.hpp index 35aed0f..ff7b551 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -40,8 +40,8 @@ namespace zmq own, attach, bind, - activate_reader, - activate_writer, + activate_read, + activate_write, pipe_term, pipe_term_ack, term_req, @@ -79,8 +79,7 @@ namespace zmq // Sent from session to socket to establish pipe(s) between them. // Caller have used inc_seqnum beforehand sending the command. struct { - class reader_t *in_pipe; - class writer_t *out_pipe; + class pipe_t *pipe; unsigned char peer_identity_size; unsigned char *peer_identity; } bind; @@ -88,13 +87,13 @@ namespace zmq // Sent by pipe writer to inform dormant pipe reader that there // are messages in the pipe. struct { - } activate_reader; + } activate_read; // Sent by pipe reader to inform pipe writer about how many // messages it has read so far. struct { uint64_t msgs_read; - } activate_writer; + } activate_write; // Sent by pipe reader to pipe writer to ask it to terminate // its end of the pipe. diff --git a/src/connect_session.cpp b/src/connect_session.cpp index c0951cb..9a29bf1 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -22,6 +22,7 @@ #include "zmq_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" +#include "err.hpp" zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, class socket_base_t *socket_, const options_t &options_, diff --git a/src/dist.cpp b/src/dist.cpp index 7c15bfd..3afa196 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -39,10 +39,8 @@ zmq::dist_t::~dist_t () zmq_assert (pipes.empty ()); } -void zmq::dist_t::attach (writer_t *pipe_) +void zmq::dist_t::attach (pipe_t *pipe_) { - pipe_->set_event_sink (this); - // If we are in the middle of sending a message, we'll add new pipe // into the list of eligible pipes. Otherwise we add it to the list // of active pipes. @@ -74,7 +72,7 @@ void zmq::dist_t::terminate () pipes [i]->terminate (); } -void zmq::dist_t::terminated (writer_t *pipe_) +void zmq::dist_t::terminated (pipe_t *pipe_) { // Remove the pipe from the list; adjust number of active and/or // eligible pipes accordingly. @@ -88,7 +86,7 @@ void zmq::dist_t::terminated (writer_t *pipe_) sink->unregister_term_ack (); } -void zmq::dist_t::activated (writer_t *pipe_) +void zmq::dist_t::activated (pipe_t *pipe_) { // Move the pipe from passive to eligible state. pipes.swap (pipes.index (pipe_), eligible); @@ -153,7 +151,7 @@ bool zmq::dist_t::has_out () return true; } -bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) +bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { pipes.swap (pipes.index (pipe_), active - 1); diff --git a/src/dist.hpp b/src/dist.hpp index fd522b9..c137332 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -31,33 +31,32 @@ namespace zmq // Class manages a set of outbound pipes. It sends each messages to // each of them. - class dist_t : public i_writer_events + class dist_t { public: dist_t (class own_t *sink_); ~dist_t (); - void attach (writer_t *pipe_); + void attach (class pipe_t *pipe_); void terminate (); int send (class msg_t *msg_, int flags_); bool has_out (); - // i_writer_events interface implementation. - void activated (writer_t *pipe_); - void terminated (writer_t *pipe_); + void activated (class pipe_t *pipe_); + void terminated (class pipe_t *pipe_); private: // Write the message to the pipe. Make the pipe inactive if writing // fails. In such a case false is returned. - bool write (class writer_t *pipe_, class msg_t *msg_); + bool write (class pipe_t *pipe_, class msg_t *msg_); // Put the message to all active pipes. void distribute (class msg_t *msg_, int flags_); // List of outbound pipes. - typedef array_t pipes_t; + typedef array_t pipes_t; pipes_t pipes; // Number of active pipes. All the active pipes are located at the diff --git a/src/fq.cpp b/src/fq.cpp index 392e554..b4ee641 100644 --- a/src/fq.cpp +++ b/src/fq.cpp @@ -38,10 +38,8 @@ zmq::fq_t::~fq_t () zmq_assert (pipes.empty ()); } -void zmq::fq_t::attach (reader_t *pipe_) +void zmq::fq_t::attach (pipe_t *pipe_) { - pipe_->set_event_sink (this); - pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; @@ -53,7 +51,7 @@ void zmq::fq_t::attach (reader_t *pipe_) } } -void zmq::fq_t::terminated (reader_t *pipe_) +void zmq::fq_t::terminated (pipe_t *pipe_) { // Make sure that we are not closing current pipe while // message is half-read. @@ -72,10 +70,6 @@ void zmq::fq_t::terminated (reader_t *pipe_) sink->unregister_term_ack (); } -void zmq::fq_t::delimited (reader_t *pipe_) -{ -} - void zmq::fq_t::terminate () { zmq_assert (!terminating); @@ -86,7 +80,7 @@ void zmq::fq_t::terminate () pipes [i]->terminate (); } -void zmq::fq_t::activated (reader_t *pipe_) +void zmq::fq_t::activated (pipe_t *pipe_) { // Move the pipe to the list of active pipes. pipes.swap (pipes.index (pipe_), active); diff --git a/src/fq.hpp b/src/fq.hpp index c35d458..bbe1b59 100644 --- a/src/fq.hpp +++ b/src/fq.hpp @@ -31,28 +31,26 @@ namespace zmq // Class manages a set of inbound pipes. On receive it performs fair // queueing (RFC970) so that senders gone berserk won't cause denial of // service for decent senders. - class fq_t : public i_reader_events + class fq_t { public: fq_t (class own_t *sink_); ~fq_t (); - void attach (reader_t *pipe_); + void attach (pipe_t *pipe_); void terminate (); int recv (msg_t *msg_, int flags_); bool has_in (); - // i_reader_events implementation. - void activated (reader_t *pipe_); - void terminated (reader_t *pipe_); - void delimited (reader_t *pipe_); + void activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); private: // Inbound pipes. - typedef array_t pipes_t; + typedef array_t pipes_t; pipes_t pipes; // Number of active pipes. All the active pipes are located at the diff --git a/src/lb.cpp b/src/lb.cpp index 8eb9157..2ba902a 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -39,10 +39,8 @@ zmq::lb_t::~lb_t () zmq_assert (pipes.empty ()); } -void zmq::lb_t::attach (writer_t *pipe_) +void zmq::lb_t::attach (pipe_t *pipe_) { - pipe_->set_event_sink (this); - pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; @@ -63,7 +61,7 @@ void zmq::lb_t::terminate () pipes [i]->terminate (); } -void zmq::lb_t::terminated (writer_t *pipe_) +void zmq::lb_t::terminated (pipe_t *pipe_) { pipes_t::size_type index = pipes.index (pipe_); @@ -85,7 +83,7 @@ void zmq::lb_t::terminated (writer_t *pipe_) sink->unregister_term_ack (); } -void zmq::lb_t::activated (writer_t *pipe_) +void zmq::lb_t::activated (pipe_t *pipe_) { // Move the pipe to the list of active pipes. pipes.swap (pipes.index (pipe_), active); diff --git a/src/lb.hpp b/src/lb.hpp index f844b01..d764f6d 100644 --- a/src/lb.hpp +++ b/src/lb.hpp @@ -27,28 +27,28 @@ namespace zmq { - // Class manages a set of outbound pipes. On send it load balances + // This class manages a set of outbound pipes. On send it load balances // messages fairly among the pipes. - class lb_t : public i_writer_events + + class lb_t { public: lb_t (class own_t *sink_); ~lb_t (); - void attach (writer_t *pipe_); + void attach (pipe_t *pipe_); void terminate (); int send (msg_t *msg_, int flags_); bool has_out (); - // i_writer_events interface implementation. - void activated (writer_t *pipe_); - void terminated (writer_t *pipe_); + void activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); private: // List of outbound pipes. - typedef array_t pipes_t; + typedef array_t pipes_t; pipes_t pipes; // Number of active pipes. All the active pipes are located at the diff --git a/src/object.cpp b/src/object.cpp index e2ca6d6..0a06d5f 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -59,12 +59,12 @@ void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { - case command_t::activate_reader: - process_activate_reader (); + case command_t::activate_read: + process_activate_read (); break; - case command_t::activate_writer: - process_activate_writer (cmd_.args.activate_writer.msgs_read); + case command_t::activate_write: + process_activate_write (cmd_.args.activate_write.msgs_read); break; case command_t::stop: @@ -90,8 +90,8 @@ void zmq::object_t::process_command (command_t &cmd_) break; case command_t::bind: - process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, - cmd_.args.bind.peer_identity ? blob_t (cmd_.args.bind.peer_identity, + process_bind (cmd_.args.bind.pipe, cmd_.args.bind.peer_identity ? + blob_t (cmd_.args.bind.peer_identity, cmd_.args.bind.peer_identity_size) : blob_t ()); process_seqnum (); break; @@ -236,8 +236,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, send_command (cmd); } -void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, - writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_) +void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, + const blob_t &peer_identity_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -248,8 +248,7 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, #endif cmd.destination = destination_; cmd.type = command_t::bind; - cmd.args.bind.in_pipe = in_pipe_; - cmd.args.bind.out_pipe = out_pipe_; + cmd.args.bind.pipe = pipe_; if (peer_identity_.empty ()) { cmd.args.bind.peer_identity_size = 0; cmd.args.bind.peer_identity = NULL; @@ -267,18 +266,18 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, send_command (cmd); } -void zmq::object_t::send_activate_reader (reader_t *destination_) +void zmq::object_t::send_activate_read (pipe_t *destination_) { command_t cmd; #if defined ZMQ_MAKE_VALGRIND_HAPPY memset (&cmd, 0, sizeof (cmd)); #endif cmd.destination = destination_; - cmd.type = command_t::activate_reader; + cmd.type = command_t::activate_read; send_command (cmd); } -void zmq::object_t::send_activate_writer (writer_t *destination_, +void zmq::object_t::send_activate_write (pipe_t *destination_, uint64_t msgs_read_) { command_t cmd; @@ -286,12 +285,12 @@ void zmq::object_t::send_activate_writer (writer_t *destination_, memset (&cmd, 0, sizeof (cmd)); #endif cmd.destination = destination_; - cmd.type = command_t::activate_writer; - cmd.args.activate_writer.msgs_read = msgs_read_; + cmd.type = command_t::activate_write; + cmd.args.activate_write.msgs_read = msgs_read_; send_command (cmd); } -void zmq::object_t::send_pipe_term (writer_t *destination_) +void zmq::object_t::send_pipe_term (pipe_t *destination_) { command_t cmd; #if defined ZMQ_MAKE_VALGRIND_HAPPY @@ -302,7 +301,7 @@ void zmq::object_t::send_pipe_term (writer_t *destination_) send_command (cmd); } -void zmq::object_t::send_pipe_term_ack (reader_t *destination_) +void zmq::object_t::send_pipe_term_ack (pipe_t *destination_) { command_t cmd; #if defined ZMQ_MAKE_VALGRIND_HAPPY @@ -404,18 +403,17 @@ void zmq::object_t::process_attach (i_engine *engine_, zmq_assert (false); } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, - const blob_t &peer_identity_) +void zmq::object_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (false); } -void zmq::object_t::process_activate_reader () +void zmq::object_t::process_activate_read () { zmq_assert (false); } -void zmq::object_t::process_activate_writer (uint64_t msgs_read_) +void zmq::object_t::process_activate_write (uint64_t msgs_read_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index 0f5e61b..0f47670 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -66,14 +66,13 @@ namespace zmq void send_attach (class session_t *destination_, struct i_engine *engine_, const blob_t &peer_identity_, bool inc_seqnum_ = true); - void send_bind (class own_t *destination_, - class reader_t *in_pipe_, class writer_t *out_pipe_, + void send_bind (class own_t *destination_, class pipe_t *pipe_, const blob_t &peer_identity_, bool inc_seqnum_ = true); - void send_activate_reader (class reader_t *destination_); - void send_activate_writer (class writer_t *destination_, + void send_activate_read (class pipe_t *destination_); + void send_activate_write (class pipe_t *destination_, uint64_t msgs_read_); - void send_pipe_term (class writer_t *destination_); - void send_pipe_term_ack (class reader_t *destination_); + void send_pipe_term (class pipe_t *destination_); + void send_pipe_term_ack (class pipe_t *destination_); void send_term_req (class own_t *destination_, class own_t *object_); void send_term (class own_t *destination_, int linger_); @@ -89,10 +88,10 @@ namespace zmq virtual void process_own (class own_t *object_); virtual void process_attach (struct i_engine *engine_, const blob_t &peer_identity_); - virtual void process_bind (class reader_t *in_pipe_, - class writer_t *out_pipe_, const blob_t &peer_identity_); - virtual void process_activate_reader (); - virtual void process_activate_writer (uint64_t msgs_read_); + virtual void process_bind (class pipe_t *pipe_, + const blob_t &peer_identity_); + virtual void process_activate_read (); + virtual void process_activate_write (uint64_t msgs_read_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); virtual void process_term_req (class own_t *object_); diff --git a/src/options.cpp b/src/options.cpp index 897e0f5..271ebdb 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -38,8 +38,6 @@ zmq::options_t::options_t () : reconnect_ivl_max (0), backlog (100), maxmsgsize (-1), - requires_in (false), - requires_out (false), immediate_connect (true) { } diff --git a/src/options.hpp b/src/options.hpp index 53d0197..fd39a74 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -75,11 +75,6 @@ namespace zmq // Maximal size of message to handle. int64_t maxmsgsize; - // These options are never set by the user directly. Instead they are - // provided by the specific socket type. - bool requires_in; - bool requires_out; - // If true, when connecting, pipes are created immediately without // waiting for the connection to be established. That way the socket // is not aware of the peer's identity, however, it is able to send diff --git a/src/own.cpp b/src/own.cpp index 4cbfdd6..cdf20a4 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -173,6 +173,7 @@ void zmq::own_t::process_term (int linger_) void zmq::own_t::register_term_acks (int count_) { term_acks += count_; + printf ("reg %d acks (%p, %d)\n", count_, (void*) this, term_acks); } void zmq::own_t::unregister_term_ack () @@ -180,6 +181,8 @@ void zmq::own_t::unregister_term_ack () zmq_assert (term_acks > 0); term_acks--; + printf ("unreg 1 acks (%p, %d)\n", (void*) this, term_acks); + // This may be a last ack we are waiting for before termination... check_term_acks (); } diff --git a/src/pair.cpp b/src/pair.cpp index d877b54..93a4327 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -25,111 +25,72 @@ zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), - inpipe (NULL), - outpipe (NULL), - inpipe_alive (false), - outpipe_alive (false), + pipe (NULL), terminating (false) { options.type = ZMQ_PAIR; - options.requires_in = true; - options.requires_out = true; } zmq::pair_t::~pair_t () { - zmq_assert (!inpipe); - zmq_assert (!outpipe); + zmq_assert (!pipe); } -void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, - const blob_t &peer_identity_) +void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - zmq_assert (!inpipe && !outpipe); + zmq_assert (!pipe); - inpipe = inpipe_; - inpipe_alive = true; - inpipe->set_event_sink (this); - - outpipe = outpipe_; - outpipe_alive = true; - outpipe->set_event_sink (this); + pipe = pipe_; + pipe->set_event_sink (this); if (terminating) { - register_term_acks (2); - inpipe_->terminate (); - outpipe_->terminate (); + register_term_acks (1); + pipe_->terminate (); } } -void zmq::pair_t::terminated (reader_t *pipe_) -{ - zmq_assert (pipe_ == inpipe); - inpipe = NULL; - inpipe_alive = false; - - if (terminating) - unregister_term_ack (); -} - -void zmq::pair_t::terminated (writer_t *pipe_) +void zmq::pair_t::terminated (pipe_t *pipe_) { - zmq_assert (pipe_ == outpipe); - outpipe = NULL; - outpipe_alive = false; + zmq_assert (pipe_ == pipe); + pipe = NULL; if (terminating) unregister_term_ack (); } -void zmq::pair_t::delimited (reader_t *pipe_) -{ -} - void zmq::pair_t::process_term (int linger_) { terminating = true; - if (inpipe) { + if (pipe) { register_term_acks (1); - inpipe->terminate (); - } - - if (outpipe) { - register_term_acks (1); - outpipe->terminate (); + pipe->terminate (); } socket_base_t::process_term (linger_); } -void zmq::pair_t::activated (class reader_t *pipe_) +void zmq::pair_t::read_activated (pipe_t *pipe_) { - zmq_assert (!inpipe_alive); - inpipe_alive = true; + // There's just one pipe. No lists of active and inactive pipes. + // There's nothing to do here. } -void zmq::pair_t::activated (class writer_t *pipe_) +void zmq::pair_t::write_activated (pipe_t *pipe_) { - zmq_assert (!outpipe_alive); - outpipe_alive = true; + // There's just one pipe. No lists of active and inactive pipes. + // There's nothing to do here. } int zmq::pair_t::xsend (msg_t *msg_, int flags_) { - if (outpipe == NULL || !outpipe_alive) { - errno = EAGAIN; - return -1; - } - - if (!outpipe->write (msg_)) { - outpipe_alive = false; + if (!pipe || !pipe->write (msg_)) { errno = EAGAIN; return -1; } if (!(flags_ & ZMQ_SNDMORE)) - outpipe->flush (); + pipe->flush (); // Detach the original message from the data buffer. int rc = msg_->init (); @@ -144,14 +105,12 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_) int rc = msg_->close (); errno_assert (rc == 0); - if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) { - - // No message is available. - inpipe_alive = false; + if (!pipe || !pipe->read (msg_)) { // Initialise the output parameter to be a 0-byte message. rc = msg_->init (); errno_assert (rc == 0); + errno = EAGAIN; return -1; } @@ -160,24 +119,23 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_) bool zmq::pair_t::xhas_in () { - if (!inpipe || !inpipe_alive) + if (!pipe) return false; - inpipe_alive = inpipe->check_read (); - return inpipe_alive; + return pipe->check_read (); } bool zmq::pair_t::xhas_out () { - if (!outpipe || !outpipe_alive) + if (!pipe) return false; msg_t msg; int rc = msg.init (); errno_assert (rc == 0); - outpipe_alive = outpipe->check_write (&msg); + bool result = pipe->check_write (&msg); rc = msg.close (); errno_assert (rc == 0); - return outpipe_alive; + return result; } diff --git a/src/pair.hpp b/src/pair.hpp index a10e15a..2cb050a 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -29,8 +29,7 @@ namespace zmq class pair_t : public socket_base_t, - public i_reader_events, - public i_writer_events + public i_pipe_events { public: @@ -38,32 +37,23 @@ namespace zmq ~pair_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); int xsend (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); - // i_reader_events interface implementation. - void activated (class reader_t *pipe_); - void terminated (class reader_t *pipe_); - void delimited (class reader_t *pipe_); - - // i_writer_events interface implementation. - void activated (class writer_t *pipe_); - void terminated (class writer_t *pipe_); + // i_pipe_events interface implementation. + void read_activated (class pipe_t *pipe_); + void write_activated (class pipe_t *pipe_); + void terminated (class pipe_t *pipe_); private: // Hook into termination process. void process_term (int linger_); - class reader_t *inpipe; - class writer_t *outpipe; - - bool inpipe_alive; - bool outpipe_alive; + class pipe_t *pipe; bool terminating; diff --git a/src/pipe.cpp b/src/pipe.cpp index 36dc808..fb03042 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -19,100 +19,123 @@ */ #include +#include #include "pipe.hpp" -#include "likely.hpp" +#include "err.hpp" -zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, int lwm_) : +int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], + int hwms_ [2], bool delays_ [2]) +{ + // Creates two pipe objects. These objects are connected by two ypipes, + // each to pass messages in one direction. + + pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t (); + alloc_assert (upipe1); + pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t (); + alloc_assert (upipe2); + + pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, + hwms_ [1], hwms_ [0], delays_ [0]); + alloc_assert (pipes_ [0]); + pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, + hwms_ [0], hwms_ [1], delays_ [1]); + alloc_assert (pipes_ [1]); + + pipes_ [0]->set_peer (pipes_ [1]); + pipes_ [1]->set_peer (pipes_ [0]); + + return 0; +} + +zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, + int inhwm_, int outhwm_, bool delay_) : object_t (parent_), - active (true), - pipe (pipe_), - writer (NULL), - lwm (lwm_), + inpipe (inpipe_), + outpipe (outpipe_), + in_active (true), + out_active (true), + hwm (outhwm_), + lwm (compute_lwm (inhwm_)), msgs_read (0), + msgs_written (0), + peers_msgs_read (0), + peer (NULL), sink (NULL), - terminating (false) + terminating (false), + term_recvd (false), + delimited (false), + delay (delay_) { - // Note that writer is not set here. Writer will inform reader about its - // address once it is created (via set_writer method). } -void zmq::reader_t::set_writer (writer_t *writer_) +zmq::pipe_t::~pipe_t () { - zmq_assert (!writer); - writer = writer_; } -zmq::reader_t::~reader_t () +void zmq::pipe_t::set_peer (pipe_t *peer_) { - // Pipe as such is owned and deallocated by reader object. - // The point is that reader processes the last step of terminal - // handshaking (term_ack). - zmq_assert (pipe); - - // First delete all the unread messages in the pipe. We have to do it by - // hand because msg_t doesn't have automatic destructor. - msg_t msg; - while (pipe->read (&msg)) { - int rc = msg.close (); - errno_assert (rc == 0); - } - - delete pipe; + // Peer can be set once only. + zmq_assert (!peer); + peer = peer_; } -void zmq::reader_t::set_event_sink (i_reader_events *sink_) +void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) { + // Sink can be set once only. zmq_assert (!sink); sink = sink_; } -bool zmq::reader_t::is_delimiter (msg_t &msg_) -{ - return msg_.is_delimiter (); -} - -bool zmq::reader_t::check_read () +bool zmq::pipe_t::check_read () { - if (!active) + if (unlikely (!in_active)) return false; // Check if there's an item in the pipe. - if (!pipe->check_read ()) { - active = false; + if (!inpipe->check_read ()) { + in_active = false; return false; } // If the next item in the pipe is message delimiter, - // initiate its termination. - if (pipe->probe (is_delimiter)) { + // initiate termination process. + if (inpipe->probe (is_delimiter)) { msg_t msg; - bool ok = pipe->read (&msg); + bool ok = inpipe->read (&msg); zmq_assert (ok); - if (sink) - sink->delimited (this); - terminate (); + delimited = true; + + // If pipe_term was already received but wasn't processed because + // of pending messages, we can ack it now. + if (terminating) + send_pipe_term_ack (peer); + return false; } return true; } -bool zmq::reader_t::read (msg_t *msg_) +bool zmq::pipe_t::read (msg_t *msg_) { - if (!active) + if (unlikely (!in_active)) return false; - if (!pipe->read (msg_)) { - active = false; + if (!inpipe->read (msg_)) { + in_active = false; return false; } // If delimiter was read, start termination process of the pipe. if (msg_->is_delimiter ()) { - if (sink) - sink->delimited (this); - terminate (); + delimited = true; + + // If pipe_term was already received but wasn't processed because + // of pending messages, we can ack it now. + if (terminating) + send_pipe_term_ack (peer); + return false; } @@ -120,175 +143,148 @@ bool zmq::reader_t::read (msg_t *msg_) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) - send_activate_writer (writer, msgs_read); + send_activate_write (peer, msgs_read); return true; } -void zmq::reader_t::terminate () +bool zmq::pipe_t::check_write (msg_t *msg_) { - // If termination was already started by the peer, do nothing. - if (terminating) - return; + if (unlikely (!out_active)) + return false; - active = false; - terminating = true; - send_pipe_term (writer); -} + bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm); -void zmq::reader_t::process_activate_reader () -{ - // Forward the event to the sink (either socket or session). - active = true; - sink->activated (this); + if (unlikely (full)) { + out_active = false; + return false; + } + + return true; } -void zmq::reader_t::process_pipe_term_ack () +bool zmq::pipe_t::write (msg_t *msg_) { - // At this point writer may already be deallocated. - // For safety's sake drop the reference to it. - writer = NULL; + if (unlikely (!check_write (msg_))) + return false; - // Notify owner about the termination. - zmq_assert (sink); - sink->terminated (this); + outpipe->write (*msg_, msg_->flags () & msg_t::more); + if (!(msg_->flags () & msg_t::more)) + msgs_written++; - // Deallocate resources. - delete this; + return true; } -zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, - int hwm_) : - object_t (parent_), - active (true), - pipe (pipe_), - reader (reader_), - hwm (hwm_), - msgs_read (0), - msgs_written (0), - sink (NULL), - terminating (false) +void zmq::pipe_t::rollback () { - // Inform reader about the writer. - reader->set_writer (this); + // Remove incomplete message from the outbound pipe. + msg_t msg; + while (outpipe->unwrite (&msg)) { + zmq_assert (msg.flags () & msg_t::more); + int rc = msg.close (); + errno_assert (rc == 0); + } } -zmq::writer_t::~writer_t () +void zmq::pipe_t::flush () { + if (!outpipe->flush ()) + send_activate_read (peer); } -void zmq::writer_t::set_event_sink (i_writer_events *sink_) +void zmq::pipe_t::process_activate_read () { - zmq_assert (!sink); - sink = sink_; + if (!in_active && !terminating) { + in_active = true; + sink->read_activated (this); + } } -bool zmq::writer_t::check_write (msg_t *msg_) +void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) { - // We've already checked and there's no space free for the new message. - // There's no point in checking once again. - if (unlikely (!active)) - return false; + // Remember the peers's message sequence number. + peers_msgs_read = msgs_read_; - if (unlikely (pipe_full ())) { - active = false; - return false; + if (!out_active && !terminating) { + out_active = true; + sink->write_activated (this); } - - return true; } -bool zmq::writer_t::write (msg_t *msg_) +void zmq::pipe_t::process_pipe_term () { - if (unlikely (!check_write (msg_))) - return false; - - pipe->write (*msg_, msg_->flags () & msg_t::more); - if (!(msg_->flags () & msg_t::more)) - msgs_written++; - - return true; + term_recvd = true; + + // We can proceed with the termination if one of the following is true: + // 1. User asked this side of pipe to terminate already. + // 2. Waiting for pending messages in not required. + // 3. Delimiter was already received. + if (terminating || !delay || delimited) { + terminating = true; + send_pipe_term_ack (peer); + } } -void zmq::writer_t::rollback () +void zmq::pipe_t::process_pipe_term_ack () { - // Remove incomplete message from the pipe. + // Notify the user that all the references to the pipe should be dropped. + zmq_assert (sink); + sink->terminated (this); + + // If the peer haven't asked for the termination itself, we have to + // ack the ack, so that it can deallocate properly. + if (!term_recvd) + send_pipe_term_ack (peer); + + // We'll deallocate the inbound pipe, the peer will deallocate the outbound + // pipe (which is an inbound pipe from its point of view). + // First, delete all the unread messages in the pipe. We have to do it by + // hand because msg_t doesn't have automatic destructor. Then deallocate + // the ypipe itself. msg_t msg; - while (pipe->unwrite (&msg)) { - zmq_assert (msg.flags () & msg_t::more); - int rc = msg.close (); - errno_assert (rc == 0); + while (inpipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); } -} + delete inpipe; -void zmq::writer_t::flush () -{ - if (!pipe->flush ()) - send_activate_reader (reader); + // Deallocate the pipe object + delete this; } -void zmq::writer_t::terminate () +void zmq::pipe_t::terminate () { // Prevent double termination. if (terminating) return; terminating = true; - // Mark the pipe as not available for writing. - active = false; + // Stop inbound and outbound flow of messages. + in_active = false; + out_active = false; - // Rollback any unfinished messages. + // Rollback any unfinished outbound messages. rollback (); - // Push delimiter into the pipe. Trick the compiler to belive that - // the tag is a valid pointer. Note that watermarks are not checked - // thus the delimiter can be written even though the pipe is full. + // Push delimiter into the outbound pipe. Note that watermarks are not + // checked thus the delimiter can be written even though the pipe is full. msg_t msg; msg.init_delimiter (); - pipe->write (msg, false); + outpipe->write (msg, false); flush (); -} - -void zmq::writer_t::process_activate_writer (uint64_t msgs_read_) -{ - // Store the reader's message sequence number. - msgs_read = msgs_read_; - - // If the writer was non-active before, let's make it active - // (available for writing messages to). - if (!active && !terminating) { - active = true; - zmq_assert (sink); - sink->activated (this); - } -} - -void zmq::writer_t::process_pipe_term () -{ - send_pipe_term_ack (reader); - - // The above command allows reader to deallocate itself and the pipe. - // For safety's sake we'll drop the pointers here. - reader = NULL; - pipe = NULL; - // Notify owner about the termination. - zmq_assert (sink); - sink->terminated (this); - - // Deallocate the resources. - delete this; + // Start the termination handshaking. + send_pipe_term (peer); } -bool zmq::writer_t::pipe_full () +bool zmq::pipe_t::is_delimiter (msg_t &msg_) { - return hwm > 0 && msgs_written - msgs_read == uint64_t (hwm); + return msg_.is_delimiter (); } -void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, - int hwm_, reader_t **reader_, writer_t **writer_) +int zmq::pipe_t::compute_lwm (int hwm_) { - // First compute the low water mark. Following point should be taken + // Compute the low water mark. Following point should be taken // into consideration: // // 1. LWM has to be less than HWM. @@ -308,17 +304,8 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, // That done, we still we have to account for the cases where // HWM < max_wm_delta thus driving LWM to negative numbers. // Let's make LWM 1/2 of HWM in such cases. - int lwm = (hwm_ > max_wm_delta * 2) ? + int result = (hwm_ > max_wm_delta * 2) ? hwm_ - max_wm_delta : (hwm_ + 1) / 2; - // Create all three objects pipe consists of: the pipe per se, reader and - // writer. The pipe will be handled by reader and writer, its never passed - // to the user. Reader and writer are returned to the user. - pipe_t *pipe = new (std::nothrow) pipe_t (); - alloc_assert (pipe); - *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm); - alloc_assert (*reader_); - *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_, - hwm_); - alloc_assert (*writer_); + return result; } diff --git a/src/pipe.hpp b/src/pipe.hpp index 75b5c47..fcba877 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -22,47 +22,43 @@ #define __ZMQ_PIPE_HPP_INCLUDED__ #include "msg.hpp" -#include "array.hpp" #include "ypipe.hpp" #include "config.hpp" #include "object.hpp" -#include "stdint.hpp" +#include "array.hpp" namespace zmq { - // Creates a pipe. Returns pointer to reader and writer objects. - void create_pipe (object_t *reader_parent_, object_t *writer_parent_, - int hwm_, class reader_t **reader_, class writer_t **writer_); - - // The shutdown mechanism for pipe works as follows: Either endpoint - // (or even both of them) can ask pipe to terminate by calling 'terminate' - // method. Pipe then terminates in asynchronous manner. When the part of - // the shutdown tied to the endpoint is done it triggers 'terminated' - // event. When endpoint processes the event and returns, associated - // reader/writer object is deallocated. - - typedef ypipe_t pipe_t; + // Create a pipepair for bi-directional transfer of messages. + // First HWM is for messages passed from first pipe to the second pipe. + // Second HWM is for messages passed from second pipe to the first pipe. + // Delay specifies whether the pipe receives all the pending messages + // before terminating or whether it terminates straight away. + int pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], + int hwms_ [2], bool delays_ [2]); - struct i_reader_events + struct i_pipe_events { - virtual ~i_reader_events () {} + virtual ~i_pipe_events () {} - virtual void terminated (class reader_t *pipe_) = 0; - virtual void activated (class reader_t *pipe_) = 0; - virtual void delimited (class reader_t *pipe_) = 0; + virtual void read_activated (class pipe_t *pipe_) = 0; + virtual void write_activated (class pipe_t *pipe_) = 0; + virtual void terminated (class pipe_t *pipe_) = 0; }; - class reader_t : public object_t, public array_item_t + class pipe_t : + public object_t, + public array_item_t { - friend void create_pipe (object_t*, object_t*, int, - reader_t**, writer_t**); - friend class writer_t; + // This allows pipepair to create pipe objects. + friend int pipepair (class object_t *parents_ [2], + class pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); public: - // Specifies the object to get events from the reader. - void set_event_sink (i_reader_events *endpoint_); + // Specifies the object to send events to. + void set_event_sink (i_pipe_events *sink_); // Returns true if there is at least one message to read in the pipe. bool check_read (); @@ -70,127 +66,100 @@ namespace zmq // Reads a message to the underlying pipe. bool read (msg_t *msg_); - // Ask pipe to terminate. - void terminate (); - - private: - - reader_t (class object_t *parent_, pipe_t *pipe_, int lwm_); - ~reader_t (); - - // To be called only by writer itself! - void set_writer (class writer_t *writer_); - - // Command handlers. - void process_activate_reader (); - void process_pipe_term_ack (); - - // Returns true if the message is delimiter; false otherwise. - static bool is_delimiter (msg_t &msg_); - - // True, if pipe can be read from. - bool active; - - // The underlying pipe. - pipe_t *pipe; - - // Pipe writer associated with the other side of the pipe. - class writer_t *writer; - - // Low watermark for in-memory storage (in bytes). - int lwm; - - // Number of messages read so far. - uint64_t msgs_read; - - // Sink for the events (either the socket of the session). - i_reader_events *sink; - - // True is 'terminate' method was called or delimiter - // was read from the pipe. - bool terminating; - - reader_t (const reader_t&); - const reader_t &operator = (const reader_t&); - }; - - struct i_writer_events - { - virtual ~i_writer_events () {} - - virtual void terminated (class writer_t *pipe_) = 0; - virtual void activated (class writer_t *pipe_) = 0; - }; - - class writer_t : public object_t, public array_item_t - { - friend void create_pipe (object_t*, object_t*, int, - reader_t**, writer_t**); - - public: - - // Specifies the object to get events from the writer. - void set_event_sink (i_writer_events *endpoint_); - - // Checks whether messages can be written to the pipe. - // If writing the message would cause high watermark - // the function returns false. + // Checks whether messages can be written to the pipe. If writing + // the message would cause high watermark the function returns false. bool check_write (msg_t *msg_); // Writes a message to the underlying pipe. Returns false if the // message cannot be written because high watermark was reached. bool write (msg_t *msg_); - // Remove unfinished part of a message from the pipe. + // Remove unfinished parts of the outbound message from the pipe. void rollback (); // Flush the messages downsteam. void flush (); - // Ask pipe to terminate. + // Ask pipe to terminate. The termination will happen asynchronously + // and user will be notified about actual deallocation by 'terminated' + // event. void terminate (); private: - writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_, - int hwm_); - ~writer_t (); - // Command handlers. - void process_activate_writer (uint64_t msgs_read_); + void process_activate_read (); + void process_activate_write (uint64_t msgs_read_); void process_pipe_term (); + void process_pipe_term_ack (); + + // Type of the underlying lock-free pipe. + typedef ypipe_t upipe_t; - // Tests whether underlying pipe is already full. - bool pipe_full (); + // Constructor is private. Pipe can only be created using + // pipepair function. + pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, + int inhwm_, int outhwm_, bool delay_); - // True, if this object can be written to. - bool active; + // Pipepair uses this function to let us know about + // the peer pipe object. + void set_peer (pipe_t *pipe_); - // The underlying pipe. - pipe_t *pipe; + // Destructor is private. Pipe objects destroy themselves. + ~pipe_t (); - // Pipe reader associated with the other side of the pipe. - reader_t *reader; + // Underlying pipes for both directions. + upipe_t *inpipe; + upipe_t *outpipe; - // High watermark for in-memory storage (in bytes). + // Can the pipe be read from / written to? + bool in_active; + bool out_active; + + // High watermark for the outbound pipe. int hwm; - // Last confirmed number of messages read from the pipe. - // The actual number can be higher. - uint64_t msgs_read; + // Low watermark for the inbound pipe. + int lwm; - // Number of messages we have written so far. + // Number of messages read and written so far. + uint64_t msgs_read; uint64_t msgs_written; - // Sink for the events (either the socket or the session). - i_writer_events *sink; + // Last received peer's msgs_read. The actual number in the peer + // can be higher at the moment. + uint64_t peers_msgs_read; - // True is 'terminate' method was called of 'pipe_term' command - // arrived from the reader. + // The pipe object on the other side of the pipepair. + pipe_t *peer; + + // Sink to send events to. + i_pipe_events *sink; + + // True is 'terminate' method was called or termination request + // was received from the peer. bool terminating; - writer_t (const writer_t&); - const writer_t &operator = (const writer_t&); + // True is we've already got pipe_term command from the peer. + bool term_recvd; + + // True if delimiter was already received from the peer. + bool delimited; + + // If true, we receive all the pending inbound messages before + // terminating. If false, we terminate immediately when the peer + // asks us to. + bool delay; + + // Returns true if the message is delimiter; false otherwise. + static bool is_delimiter (msg_t &msg_); + + // Computes appropriate low watermark from the given high watermark. + static int compute_lwm (int hwm_); + + // Disable copying. + pipe_t (const pipe_t&); + const pipe_t &operator = (const pipe_t&); }; } diff --git a/src/pull.cpp b/src/pull.cpp index b9d4433..66457b8 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -27,19 +27,33 @@ zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) : fq (this) { options.type = ZMQ_PULL; - options.requires_in = true; - options.requires_out = false; } zmq::pull_t::~pull_t () { } -void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - zmq_assert (inpipe_ && !outpipe_); - fq.attach (inpipe_); + zmq_assert (pipe_); + pipe_->set_event_sink (this); + fq.attach (pipe_); +} + +void zmq::pull_t::read_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); +} + +void zmq::pull_t::write_activated (pipe_t *pipe_) +{ + // There are no outbound messages in pull socket. This should never happen. + zmq_assert (false); +} + +void zmq::pull_t::terminated (pipe_t *pipe_) +{ + fq.terminated (pipe_); } void zmq::pull_t::process_term (int linger_) diff --git a/src/pull.hpp b/src/pull.hpp index ffc3fdb..af59724 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -22,12 +22,15 @@ #define __ZMQ_PULL_HPP_INCLUDED__ #include "socket_base.hpp" +#include "pipe.hpp" #include "fq.hpp" namespace zmq { - class pull_t : public socket_base_t + class pull_t : + public socket_base_t, + public i_pipe_events { public: @@ -37,13 +40,17 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); private: + // i_pipe_events interface implementation. + void read_activated (pipe_t *pipe_); + void write_activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); + // Hook into the termination process. void process_term (int linger_); diff --git a/src/push.cpp b/src/push.cpp index d6ee399..12fc8d2 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -28,19 +28,33 @@ zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) : lb (this) { options.type = ZMQ_PUSH; - options.requires_in = false; - options.requires_out = true; } zmq::push_t::~push_t () { } -void zmq::push_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::push_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - zmq_assert (!inpipe_ && outpipe_); - lb.attach (outpipe_); + zmq_assert (pipe_); + pipe_->set_event_sink (this); + lb.attach (pipe_); +} + +void zmq::push_t::read_activated (pipe_t *pipe_) +{ + // There are no inbound messages in push socket. This should never happen. + zmq_assert (false); +} + +void zmq::push_t::write_activated (pipe_t *pipe_) +{ + lb.activated (pipe_); +} + +void zmq::push_t::terminated (pipe_t *pipe_) +{ + lb.terminated (pipe_); } void zmq::push_t::process_term (int linger_) diff --git a/src/push.hpp b/src/push.hpp index c4d63f6..67763eb 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -22,12 +22,15 @@ #define __ZMQ_PUSH_HPP_INCLUDED__ #include "socket_base.hpp" +#include "pipe.hpp" #include "lb.hpp" namespace zmq { - class push_t : public socket_base_t + class push_t : + public socket_base_t, + public i_pipe_events { public: @@ -37,13 +40,17 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); int xsend (class msg_t *msg_, int flags_); bool xhas_out (); private: + // i_pipe_events interface implementation. + void read_activated (pipe_t *pipe_); + void write_activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); + // Hook into the termination process. void process_term (int linger_); diff --git a/src/session.cpp b/src/session.cpp index 499fe40..5ef21c7 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -29,13 +29,12 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, class socket_base_t *socket_, const options_t &options_) : own_t (io_thread_, options_), io_object_t (io_thread_), - in_pipe (NULL), + pipe (NULL), incomplete_in (false), - out_pipe (NULL), engine (NULL), socket (socket_), io_thread (io_thread_), - pipes_attached (false), + pipe_attached (false), delimiter_processed (false), force_terminate (false), has_linger_timer (false), @@ -45,8 +44,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, zmq::session_t::~session_t () { - zmq_assert (!in_pipe); - zmq_assert (!out_pipe); + zmq_assert (!pipe); if (engine) engine->terminate (); @@ -66,13 +64,9 @@ void zmq::session_t::proceed_with_term () has_linger_timer = false; } - if (in_pipe) { + if (pipe) { register_term_acks (1); - in_pipe->terminate (); - } - if (out_pipe) { - register_term_acks (1); - out_pipe->terminate (); + pipe->terminate (); } // The session has already waited for the linger period. We don't want @@ -82,10 +76,10 @@ void zmq::session_t::proceed_with_term () bool zmq::session_t::read (msg_t *msg_) { - if (!in_pipe) + if (!pipe) return false; - if (!in_pipe->read (msg_)) + if (!pipe->read (msg_)) return false; incomplete_in = msg_->flags () & msg_t::more; @@ -94,7 +88,7 @@ bool zmq::session_t::read (msg_t *msg_) bool zmq::session_t::write (msg_t *msg_) { - if (out_pipe && out_pipe->write (msg_)) { + if (pipe && pipe->write (msg_)) { int rc = msg_->init (); errno_assert (rc == 0); return true; @@ -105,21 +99,20 @@ bool zmq::session_t::write (msg_t *msg_) void zmq::session_t::flush () { - if (out_pipe) - out_pipe->flush (); + if (pipe) + pipe->flush (); } void zmq::session_t::clean_pipes () { - // Get rid of half-processed messages in the out pipe. Flush any - // unflushed messages upstream. - if (out_pipe) { - out_pipe->rollback (); - out_pipe->flush (); - } + if (pipe) { - // Remove any half-read message from the in pipe. - if (in_pipe) { + // Get rid of half-processed messages in the out pipe. Flush any + // unflushed messages upstream. + pipe->rollback (); + pipe->flush (); + + // Remove any half-read message from the in pipe. while (incomplete_in) { msg_t msg; int rc = msg.init (); @@ -134,78 +127,54 @@ void zmq::session_t::clean_pipes () } } -void zmq::session_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::session_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - zmq_assert (!pipes_attached); - pipes_attached = true; + zmq_assert (!pipe_attached); + pipe_attached = true; - if (inpipe_) { - zmq_assert (!in_pipe); - in_pipe = inpipe_; - in_pipe->set_event_sink (this); - } - - if (outpipe_) { - zmq_assert (!out_pipe); - out_pipe = outpipe_; - out_pipe->set_event_sink (this); + if (pipe_) { + zmq_assert (!pipe); + pipe = pipe_; + pipe->set_event_sink (this); } // If we are already terminating, terminate the pipes straight away. if (state == terminating) { - if (in_pipe) { - in_pipe->terminate (); - register_term_acks (1); - } - if (out_pipe) { - out_pipe->terminate (); + if (pipe) { + pipe->terminate (); register_term_acks (1); } } } -void zmq::session_t::delimited (reader_t *pipe_) +void zmq::session_t::terminated (pipe_t *pipe_) { - zmq_assert (in_pipe == pipe_); - zmq_assert (!delimiter_processed); - delimiter_processed = true; + zmq_assert (pipe == pipe_); - // If we are in process of being closed, but still waiting for all - // pending messeges being sent, we can terminate here. + // If we are in process of being closed, but still waiting for all + // pending messeges being sent, we can terminate here. if (state == pending) proceed_with_term (); -} -void zmq::session_t::terminated (reader_t *pipe_) -{ - zmq_assert (in_pipe == pipe_); - in_pipe = NULL; + pipe = NULL; if (state == terminating) unregister_term_ack (); } -void zmq::session_t::terminated (writer_t *pipe_) +void zmq::session_t::read_activated (pipe_t *pipe_) { - zmq_assert (out_pipe == pipe_); - out_pipe = NULL; - if (state == terminating) - unregister_term_ack (); -} - -void zmq::session_t::activated (reader_t *pipe_) -{ - zmq_assert (in_pipe == pipe_); + zmq_assert (pipe == pipe_); if (likely (engine != NULL)) engine->activate_out (); else - in_pipe->check_read (); + pipe->check_read (); } -void zmq::session_t::activated (writer_t *pipe_) +void zmq::session_t::write_activated (pipe_t *pipe_) { - zmq_assert (out_pipe == pipe_); + zmq_assert (pipe == pipe_); + if (engine) engine->activate_in (); } @@ -240,29 +209,27 @@ void zmq::session_t::process_attach (i_engine *engine_, return; } - // Check whether the required pipes already exist. If not so, we'll - // create them and bind them to the socket object. - if (!pipes_attached) { - zmq_assert (!in_pipe && !out_pipe); - pipes_attached = true; - reader_t *socket_reader = NULL; - writer_t *socket_writer = NULL; - - // Create the pipes, as required. - if (options.requires_in) { - create_pipe (socket, this, options.rcvhwm, &socket_reader, - &out_pipe); - out_pipe->set_event_sink (this); - } - if (options.requires_out) { - create_pipe (this, socket, options.sndhwm, &in_pipe, - &socket_writer); - in_pipe->set_event_sink (this); - } + // Check whether the required pipe already exists and create it + // if it does not. + if (!pipe_attached) { + zmq_assert (!pipe); + pipe_attached = true; + + object_t *parents [2] = {this, socket}; + pipe_t *pipes [2] = {NULL, NULL}; + int hwms [2] = {options.rcvhwm, options.sndhwm}; + bool delays [2] = {true, true}; + int rc = pipepair (parents, pipes, hwms, delays); + errno_assert (rc == 0); + + // Plug the local end of the pipe. + pipes [0]->set_event_sink (this); + + // Remember the local end of the pipe. + pipe = pipes [0]; - // Bind the pipes to the socket object. - if (socket_reader || socket_writer) - send_bind (socket, socket_reader, socket_writer, peer_identity_); + // Ask socket to plug into the remote end of the pipe. + send_bind (socket, pipes [1], peer_identity_); } // Plug in the engine. @@ -282,9 +249,9 @@ void zmq::session_t::detach () // Send the event to the derived class. detached (); - // Just in case, there's only a delimiter in the inbound pipe. - if (in_pipe) - in_pipe->check_read (); + // Just in case there's only a delimiter in the inbound pipe. + if (pipe) + pipe->check_read (); } void zmq::session_t::process_term (int linger_) @@ -308,16 +275,16 @@ void zmq::session_t::process_term (int linger_) // If there's no engine and there's only delimiter in the pipe it wouldn't // be ever read. Thus we check for it explicitly. - if (in_pipe) - in_pipe->check_read (); + if (pipe) + pipe->check_read (); - // If there's no in pipe there are no pending messages to send. + // If there's no in pipe, there are no pending messages to send. // We can proceed with the shutdown straight away. Also, if there is - // inbound pipe, but the delimiter was already processed, we can - // terminate immediately. Alternatively, if the derived session type have + // pipe, but the delimiter was already processed, we can terminate + // immediately. Alternatively, if the derived session type have // called 'terminate' we'll finish straight away. - if (!options.requires_out || delimiter_processed || force_terminate || - (!options.immediate_connect && !in_pipe)) + if (delimiter_processed || force_terminate || + (!options.immediate_connect && !pipe)) proceed_with_term (); } diff --git a/src/session.hpp b/src/session.hpp index d2f8882..4a12d68 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -34,8 +34,7 @@ namespace zmq public own_t, public io_object_t, public i_inout, - public i_reader_events, - public i_writer_events + public i_pipe_events { public: @@ -50,17 +49,12 @@ namespace zmq void flush (); void detach (); - void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); - - // i_reader_events interface implementation. - void activated (class reader_t *pipe_); - void terminated (class reader_t *pipe_); - void delimited (class reader_t *pipe_); + void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); - // i_writer_events interface implementation. - void activated (class writer_t *pipe_); - void terminated (class writer_t *pipe_); + // i_pipe_events interface implementation. + void read_activated (class pipe_t *pipe_); + void write_activated (class pipe_t *pipe_); + void terminated (class pipe_t *pipe_); protected: @@ -103,16 +97,13 @@ namespace zmq // Call this function to move on with the delayed process_term. void proceed_with_term (); - // Inbound pipe, i.e. one the session is getting messages from. - class reader_t *in_pipe; + // Pipe connecting the session to its socket. + class pipe_t *pipe; // This flag is true if the remainder of the message being processed // is still in the in pipe. bool incomplete_in; - // Outbound pipe, i.e. one the socket is sending messages to. - class writer_t *out_pipe; - // The protocol I/O engine connected to the session. struct i_engine *engine; @@ -123,8 +114,8 @@ namespace zmq // the engines into the same thread. class io_thread_t *io_thread; - // If true, pipes were already attached to this session. - bool pipes_attached; + // If true, pipe was already attached to this session. + bool pipe_attached; // If true, delimiter was already read from the inbound pipe. bool delimiter_processed; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 3e104a8..baa4bd2 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -211,17 +211,17 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_, + const blob_t &peer_identity_) { // If the peer haven't specified it's identity, let's generate one. if (peer_identity_.size ()) { - xattach_pipes (inpipe_, outpipe_, peer_identity_); + xattach_pipe (pipe_, peer_identity_); } else { blob_t identity (17, 0); generate_uuid ((unsigned char*) identity.data () + 1); - xattach_pipes (inpipe_, outpipe_, identity); + xattach_pipe (pipe_, identity); } } @@ -378,11 +378,6 @@ int zmq::socket_base_t::connect (const char *addr_) if (!peer.socket) return -1; - reader_t *inpipe_reader = NULL; - writer_t *inpipe_writer = NULL; - reader_t *outpipe_reader = NULL; - writer_t *outpipe_writer = NULL; - // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. int sndhwm; @@ -396,24 +391,21 @@ int zmq::socket_base_t::connect (const char *addr_) else rcvhwm = options.rcvhwm + peer.options.sndhwm; - // Create inbound pipe, if required. - if (options.requires_in) - create_pipe (this, peer.socket, rcvhwm, &inpipe_reader, - &inpipe_writer); - - // Create outbound pipe, if required. - if (options.requires_out) - create_pipe (peer.socket, this, sndhwm, &outpipe_reader, - &outpipe_writer); + // Create a bi-directional pipe to connect the peers. + object_t *parents [2] = {this, peer.socket}; + pipe_t *pipes [2] = {NULL, NULL}; + int hwms [2] = {sndhwm, rcvhwm}; + bool delays [2] = {true, true}; + int rc = pipepair (parents, pipes, hwms, delays); + errno_assert (rc == 0); - // Attach the pipes to this socket object. - attach_pipes (inpipe_reader, outpipe_writer, peer.options.identity); + // Attach local end of the pipe to this socket object. + attach_pipe (pipes [0], peer.options.identity); - // Attach the pipes to the peer socket. Note that peer's seqnum - // was incremented in find_endpoint function. We don't need it + // Attach remote end of the pipe to the peer socket. Note that peer's + // seqnum was incremented in find_endpoint function. We don't need it // increased here. - send_bind (peer.socket, outpipe_reader, inpipe_writer, - options.identity, false); + send_bind (peer.socket, pipes [1], options.identity, false); return 0; } @@ -435,26 +427,19 @@ int zmq::socket_base_t::connect (const char *addr_) // session once the connection is established. if (options.immediate_connect) { - reader_t *inpipe_reader = NULL; - writer_t *inpipe_writer = NULL; - reader_t *outpipe_reader = NULL; - writer_t *outpipe_writer = NULL; - - // Create inbound pipe, if required. - if (options.requires_in) - create_pipe (this, session, options.rcvhwm, - &inpipe_reader, &inpipe_writer); - - // Create outbound pipe, if required. - if (options.requires_out) - create_pipe (session, this, options.sndhwm, - &outpipe_reader, &outpipe_writer); + // Create a bi-directional pipe. + object_t *parents [2] = {this, session}; + pipe_t *pipes [2] = {NULL, NULL}; + int hwms [2] = {options.sndhwm, options.rcvhwm}; + bool delays [2] = {true, true}; + int rc = pipepair (parents, pipes, hwms, delays); + errno_assert (rc == 0); - // Attach the pipes to the socket object. - attach_pipes (inpipe_reader, outpipe_writer, blob_t ()); + // Attach local end of the pipe to the socket object. + attach_pipe (pipes [0], blob_t ()); - // Attach the pipes to the session object. - session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ()); + // Attach remote end of the pipe to the session object. + session->attach_pipe (pipes [1], blob_t ()); } // Activate the session. Make it a child of this socket. @@ -718,10 +703,10 @@ void zmq::socket_base_t::process_stop () ctx_terminated = true; } -void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, +void zmq::socket_base_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_) { - attach_pipes (in_pipe_, out_pipe_, peer_identity_); + attach_pipe (pipe_, peer_identity_); } void zmq::socket_base_t::process_unplug () diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 0a5c574..531751b 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -110,8 +110,8 @@ namespace zmq // Concrete algorithms for the x- methods are to be defined by // individual socket types. - virtual void xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) = 0; + virtual void xattach_pipe (class pipe_t *pipe_, + const blob_t &peer_identity_) = 0; // The default implementation assumes there are no specific socket // options for the particular socket type. If not so, overload this @@ -156,9 +156,8 @@ namespace zmq // bind, is available and compatible with the socket type. int check_protocol (const std::string &protocol_); - // If no identity set generate one and call xattach_pipes (). - void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); + // If no identity is set, generate one and call xattach_pipe (). + void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); // Processes commands sent to this socket (if any). If 'block' is // set to true, returns only after at least one command was processed. @@ -168,8 +167,7 @@ namespace zmq // Handlers for incoming commands. void process_stop (); - void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, - const blob_t &peer_identity_); + void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_); void process_unplug (); // Socket's mailbox object. diff --git a/src/xpub.cpp b/src/xpub.cpp index 2b5c4eb..888b42d 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -28,19 +28,33 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : dist (this) { options.type = ZMQ_XPUB; - options.requires_in = false; - options.requires_out = true; } zmq::xpub_t::~xpub_t () { } -void zmq::xpub_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - zmq_assert (!inpipe_ && outpipe_); - dist.attach (outpipe_); + zmq_assert (pipe_); + pipe_->set_event_sink (this); + dist.attach (pipe_); +} + +void zmq::xpub_t::read_activated (pipe_t *pipe_) +{ + // PUB socket never receives messages. This should never happen. + zmq_assert (false); +} + +void zmq::xpub_t::write_activated (pipe_t *pipe_) +{ + dist.activated (pipe_); +} + +void zmq::xpub_t::terminated (pipe_t *pipe_) +{ + dist.terminated (pipe_); } void zmq::xpub_t::process_term (int linger_) diff --git a/src/xpub.hpp b/src/xpub.hpp index 19aa38a..48efd17 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -29,7 +29,9 @@ namespace zmq { - class xpub_t : public socket_base_t + class xpub_t : + public socket_base_t, + public i_pipe_events { public: @@ -37,8 +39,7 @@ namespace zmq ~xpub_t (); // Implementations of virtual functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); int xsend (class msg_t *msg_, int flags_); bool xhas_out (); int xrecv (class msg_t *msg_, int flags_); @@ -46,6 +47,11 @@ namespace zmq private: + // i_pipe_events interface implementation. + void read_activated (pipe_t *pipe_); + void write_activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); + // Hook into the termination process. void process_term (int linger_); diff --git a/src/xrep.cpp b/src/xrep.cpp index 2650f4e..d82890d 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -32,8 +32,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : terminating (false) { options.type = ZMQ_XREP; - options.requires_in = true; - options.requires_out = true; // On connect, pipes are created only after initial handshaking. // That way we are aware of the peer's identity when binding to the pipes. @@ -46,36 +44,26 @@ zmq::xrep_t::~xrep_t () zmq_assert (outpipes.empty ()); } -void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, - const blob_t &peer_identity_) +void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - if (outpipe_) { - - outpipe_->set_event_sink (this); - - // TODO: What if new connection has same peer identity as the old one? - outpipe_t outpipe = {outpipe_, true}; - bool ok = outpipes.insert (outpipes_t::value_type ( - peer_identity_, outpipe)).second; - zmq_assert (ok); - - if (terminating) { - register_term_acks (1); - outpipe_->terminate (); - } - } - - if (inpipe_) { - - inpipe_->set_event_sink (this); - - inpipe_t inpipe = {inpipe_, peer_identity_, true}; - inpipes.push_back (inpipe); - - if (terminating) { - register_term_acks (1); - inpipe_->terminate (); - } + zmq_assert (pipe_); + pipe_->set_event_sink (this); + + // Add the pipe to the map out outbound pipes. + // TODO: What if new connection has same peer identity as the old one? + outpipe_t outpipe = {pipe_, true}; + bool ok = outpipes.insert (outpipes_t::value_type ( + peer_identity_, outpipe)).second; + zmq_assert (ok); + + // Add the pipe to the list of inbound pipes. + inpipe_t inpipe = {pipe_, peer_identity_, true}; + inpipes.push_back (inpipe); + + // In case we are already terminating, ask this pipe to terminate as well. + if (terminating) { + register_term_acks (1); + pipe_->terminate (); } } @@ -85,21 +73,17 @@ void zmq::xrep_t::process_term (int linger_) register_term_acks ((int) (inpipes.size () + outpipes.size ())); - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); - ++it) - it->reader->terminate (); - for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); - ++it) - it->second.writer->terminate (); + for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) + it->pipe->terminate (); socket_base_t::process_term (linger_); } -void zmq::xrep_t::terminated (reader_t *pipe_) +void zmq::xrep_t::terminated (pipe_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) { - if (it->reader == pipe_) { + if (it->pipe == pipe_) { if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in) current_in--; inpipes.erase (it); @@ -107,17 +91,15 @@ void zmq::xrep_t::terminated (reader_t *pipe_) current_in = 0; if (terminating) unregister_term_ack (); - return; + goto clean_outpipes; } } zmq_assert (false); -} -void zmq::xrep_t::terminated (writer_t *pipe_) -{ +clean_outpipes: for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { - if (it->second.writer == pipe_) { + if (it->second.pipe == pipe_) { outpipes.erase (it); if (pipe_ == current_out) current_out = NULL; @@ -129,15 +111,11 @@ void zmq::xrep_t::terminated (writer_t *pipe_) zmq_assert (false); } -void zmq::xrep_t::delimited (reader_t *pipe_) -{ -} - -void zmq::xrep_t::activated (reader_t *pipe_) +void zmq::xrep_t::read_activated (pipe_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) { - if (it->reader == pipe_) { + if (it->pipe == pipe_) { zmq_assert (!it->active); it->active = true; return; @@ -146,11 +124,11 @@ void zmq::xrep_t::activated (reader_t *pipe_) zmq_assert (false); } -void zmq::xrep_t::activated (writer_t *pipe_) +void zmq::xrep_t::write_activated (pipe_t *pipe_) { for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { - if (it->second.writer == pipe_) { + if (it->second.pipe == pipe_) { zmq_assert (!it->second.active); it->second.active = true; return; @@ -178,7 +156,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) outpipes_t::iterator it = outpipes.find (identity); if (it != outpipes.end ()) { - current_out = it->second.writer; + current_out = it->second.pipe; msg_t empty; int rc = empty.init (); errno_assert (rc == 0); @@ -245,7 +223,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // If we are in the middle of reading a message, just grab next part of it. if (more_in) { zmq_assert (inpipes [current_in].active); - bool fetched = inpipes [current_in].reader->read (msg_); + bool fetched = inpipes [current_in].pipe->read (msg_); zmq_assert (fetched); more_in = msg_->flags () & msg_t::more; if (!more_in) { @@ -261,7 +239,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // Try to fetch new message. if (inpipes [current_in].active) - prefetched = inpipes [current_in].reader->read (&prefetched_msg); + prefetched = inpipes [current_in].pipe->read (&prefetched_msg); // If we have a message, create a prefix and return it to the caller. if (prefetched) { @@ -311,7 +289,7 @@ bool zmq::xrep_t::xhas_in () // pipe holding messages, skipping only pipes with no messages available. for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { if (inpipes [current_in].active && - inpipes [current_in].reader->check_read ()) + inpipes [current_in].pipe->check_read ()) return true; // If me don't have a message, mark the pipe as passive and diff --git a/src/xrep.hpp b/src/xrep.hpp index 7ca138c..d0378c2 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -35,8 +35,7 @@ namespace zmq // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. class xrep_t : public socket_base_t, - public i_reader_events, - public i_writer_events + public i_pipe_events { public: @@ -44,8 +43,7 @@ namespace zmq ~xrep_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, - const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); int xsend (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); @@ -61,18 +59,14 @@ namespace zmq // Hook into the termination process. void process_term (int linger_); - // i_reader_events interface implementation. - void activated (reader_t *pipe_); - void terminated (reader_t *pipe_); - void delimited (reader_t *pipe_); - - // i_writer_events interface implementation. - void activated (writer_t *pipe_); - void terminated (writer_t *pipe_); + // i_pipe_events interface implementation. + void read_activated (pipe_t *pipe_); + void write_activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); struct inpipe_t { - class reader_t *reader; + class pipe_t *pipe; blob_t identity; bool active; }; @@ -95,7 +89,7 @@ namespace zmq struct outpipe_t { - class writer_t *writer; + class pipe_t *pipe; bool active; }; @@ -104,7 +98,7 @@ namespace zmq outpipes_t outpipes; // The pipe we are currently writing to. - class writer_t *current_out; + class pipe_t *current_out; // If true, more outgoing message parts are expected. bool more_out; diff --git a/src/xreq.cpp b/src/xreq.cpp index 2fda2c1..4a6e67e 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -28,20 +28,18 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : lb (this) { options.type = ZMQ_XREQ; - options.requires_in = true; - options.requires_out = true; } zmq::xreq_t::~xreq_t () { } -void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::xreq_t::xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_) { - zmq_assert (inpipe_ && outpipe_); - fq.attach (inpipe_); - lb.attach (outpipe_); + zmq_assert (pipe_); + pipe_->set_event_sink (this); + fq.attach (pipe_); + lb.attach (pipe_); } void zmq::xreq_t::process_term (int linger_) @@ -71,3 +69,19 @@ bool zmq::xreq_t::xhas_out () return lb.has_out (); } +void zmq::xreq_t::read_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); +} + +void zmq::xreq_t::write_activated (pipe_t *pipe_) +{ + lb.activated (pipe_); +} + +void zmq::xreq_t::terminated (pipe_t *pipe_) +{ + fq.terminated (pipe_); + lb.terminated (pipe_); +} + diff --git a/src/xreq.hpp b/src/xreq.hpp index e0cafe5..a75e5c8 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -23,13 +23,16 @@ #define __ZMQ_XREQ_HPP_INCLUDED__ #include "socket_base.hpp" +#include "pipe.hpp" #include "fq.hpp" #include "lb.hpp" namespace zmq { - class xreq_t : public socket_base_t + class xreq_t : + public socket_base_t, + public i_pipe_events { public: @@ -39,8 +42,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); int xsend (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); @@ -48,6 +50,11 @@ namespace zmq private: + // i_pipe_events interface implementation. + void read_activated (pipe_t *pipe_); + void write_activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); + // Hook into the termination process. void process_term (int linger_); diff --git a/src/xsub.cpp b/src/xsub.cpp index b0e8cd2..dc30d71 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -30,8 +30,6 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : more (false) { options.type = ZMQ_XSUB; - options.requires_in = true; - options.requires_out = false; int rc = message.init (); errno_assert (rc == 0); } @@ -42,11 +40,27 @@ zmq::xsub_t::~xsub_t () errno_assert (rc == 0); } -void zmq::xsub_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - zmq_assert (inpipe_ && !outpipe_); - fq.attach (inpipe_); + zmq_assert (pipe_); + pipe_->set_event_sink (this); + fq.attach (pipe_); +} + +void zmq::xsub_t::read_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); +} + +void zmq::xsub_t::write_activated (pipe_t *pipe_) +{ + // SUB socket never sends messages. This should never happen. + zmq_assert (false); +} + +void zmq::xsub_t::terminated (pipe_t *pipe_) +{ + fq.terminated (pipe_); } void zmq::xsub_t::process_term (int linger_) diff --git a/src/xsub.hpp b/src/xsub.hpp index 202a29f..ed9c462 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -23,13 +23,16 @@ #include "trie.hpp" #include "socket_base.hpp" +#include "pipe.hpp" #include "msg.hpp" #include "fq.hpp" namespace zmq { - class xsub_t : public socket_base_t + class xsub_t : + public socket_base_t, + public i_pipe_events { public: @@ -39,8 +42,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); int xsend (class msg_t *msg_, int options_); bool xhas_out (); int xrecv (class msg_t *msg_, int flags_); @@ -48,6 +50,11 @@ namespace zmq private: + // i_pipe_events interface implementation. + void read_activated (pipe_t *pipe_); + void write_activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); + // Hook into the termination process. void process_term (int linger_); -- cgit v1.2.3