diff options
40 files changed, 242 insertions, 77 deletions
| diff --git a/src/command.hpp b/src/command.hpp index 150cad1..3d00cd7 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -40,6 +40,7 @@ namespace zmq              attach,              bind,              revive, +            reader_info,              pipe_term,              pipe_term_ack,              term_req, @@ -84,6 +85,13 @@ namespace zmq              struct {              } revive; +            //  Sent by pipe reader to inform pipe writer +            //  about how many messages it has read so far. +            //  Used to implement the flow control. +            struct { +                uint64_t msgs_read; +            } reader_info; +              //  Sent by pipe reader to pipe writer to ask it to terminate              //  its end of the pipe.              struct { diff --git a/src/downstream.cpp b/src/downstream.cpp index 7ff88f1..feeb8c3 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -65,6 +65,11 @@ void zmq::downstream_t::xrevive (class reader_t *pipe_)      zmq_assert (false);  } +void zmq::downstream_t::xrevive (class writer_t *pipe_) +{ +    zmq_not_implemented (); +} +  int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,      size_t optvallen_)  { @@ -104,4 +109,3 @@ bool zmq::downstream_t::xhas_out ()      return lb.has_out ();  } - diff --git a/src/downstream.hpp b/src/downstream.hpp index dbd79a5..998ab73 100644 --- a/src/downstream.hpp +++ b/src/downstream.hpp @@ -40,6 +40,7 @@ namespace zmq          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_);          void xrevive (class reader_t *pipe_); +        void xrevive (class writer_t *pipe_);          int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          int xsend (zmq_msg_t *msg_, int flags_);          int xflush (); diff --git a/src/err.hpp b/src/err.hpp index 6c13b02..2b76569 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -115,3 +115,9 @@ namespace zmq      } while (false)  #endif + +#define zmq_not_implemented() \ +    do {\ +        fprintf (stderr, "Hic sunt leones (%s:%d)\n", __FILE__, __LINE__);\ +        abort ();\ +    } while (false) diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index ddab6a4..0d14224 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -35,6 +35,7 @@ namespace zmq          virtual void detach_outpipe (class writer_t *pipe_) = 0;          virtual void kill (class reader_t *pipe_) = 0;          virtual void revive (class reader_t *pipe_) = 0; +        virtual void revive (class writer_t *pipe_) = 0;      };  } diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 81b56df..bb5f391 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -41,6 +41,8 @@ namespace zmq          //  are messages to send available.          virtual void revive () = 0; +        virtual void resume_input () = 0; +          //  Engine should add the prefix supplied to all inbound messages.          virtual void add_prefix (const blob_t &identity_) = 0; @@ -54,15 +54,6 @@ void zmq::lb_t::detach (writer_t *pipe_)      pipes.erase (pipe_);  } -void zmq::lb_t::kill (writer_t *pipe_) -{ -    //  Move the pipe to the list of inactive pipes. -    active--; -    if (current == active) -        current = 0; -    pipes.swap (pipes.index (pipe_), active); -} -  void zmq::lb_t::revive (writer_t *pipe_)  {      //  Move the pipe to the list of active pipes. @@ -72,42 +63,46 @@ void zmq::lb_t::revive (writer_t *pipe_)  int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)  { +    while (active > 0) { +        if (pipes [current]->write (msg_)) +            break; + +        active--; +        if (current < active) +            pipes.swap (current, active); +        else +            current = 0; +    } +      //  If there are no pipes we cannot send the message. -    if (pipes.empty ()) { +    if (active == 0) {          errno = EAGAIN;          return -1;      } -    //  TODO: Implement this once queue limits are in-place. -    zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); - -    //  Push message to the selected pipe. -    pipes [current]->write (msg_); -    pipes [current]->flush (); +    if (!(flags_ & ZMQ_NOFLUSH)) +        pipes [current]->flush ();      //  Detach the message from the data buffer.      int rc = zmq_msg_init (msg_);      zmq_assert (rc == 0);      //  Move to the next pipe (load-balancing). -    current++; -    if (current >= active) -        current = 0; +    current = (current + 1) % active;      return 0;  }  bool zmq::lb_t::has_out ()  { -    for (int count = active; count != 0; count--) { - -        //  We should be able to write at least 1-byte message to interrupt -        //  polling for POLLOUT. -        //  TODO: Shouldn't we use a saner value here? -        if (pipes [current]->check_write (1)) +    while (active > 0) { +        if (pipes [current]->check_write ())              return true; -        current++; -        if (current >= active) + +        active--; +        if (current < active) +            pipes.swap (current, active); +        else              current = 0;      } @@ -36,7 +36,6 @@ namespace zmq          void attach (class writer_t *pipe_);          void detach (class writer_t *pipe_); -        void kill (class writer_t *pipe_);          void revive (class writer_t *pipe_);          int send (zmq_msg_t *msg_, int flags_);          bool has_out (); diff --git a/src/object.cpp b/src/object.cpp index 356fcd1..5821c89 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -95,6 +95,10 @@ void zmq::object_t::process_command (command_t &cmd_)          process_seqnum ();          break; +    case command_t::reader_info: +        process_reader_info (cmd_.args.reader_info.msgs_read); +        break; +      case command_t::pipe_term:          process_pipe_term ();          return; @@ -249,6 +253,16 @@ void zmq::object_t::send_revive (object_t *destination_)      send_command (cmd);  } +void zmq::object_t::send_reader_info (writer_t *destination_, +    uint64_t msgs_read_) +{ +    command_t cmd; +    cmd.destination = destination_; +    cmd.type = command_t::reader_info; +    cmd.args.reader_info.msgs_read = msgs_read_; +    send_command (cmd); +} +  void zmq::object_t::send_pipe_term (writer_t *destination_)  {      command_t cmd; @@ -323,6 +337,11 @@ void zmq::object_t::process_revive ()      zmq_assert (false);  } +void zmq::object_t::process_reader_info (uint64_t msgs_read_) +{ +    zmq_assert (false); +} +  void zmq::object_t::process_pipe_term ()  {      zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 1544109..f29342e 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -71,6 +71,8 @@ namespace zmq               class reader_t *in_pipe_, class writer_t *out_pipe_,               const blob_t &peer_identity_, bool inc_seqnum_ = true);          void send_revive (class object_t *destination_); +        void send_reader_info (class writer_t *destination_, +             uint64_t msgs_read_);          void send_pipe_term (class writer_t *destination_);          void send_pipe_term_ack (class reader_t *destination_);          void send_term_req (class socket_base_t *destination_, @@ -88,6 +90,7 @@ namespace zmq          virtual void process_bind (class reader_t *in_pipe_,              class writer_t *out_pipe_, const blob_t &peer_identity_);          virtual void process_revive (); +        virtual void process_reader_info (uint64_t msgs_read_);          virtual void process_pipe_term ();          virtual void process_pipe_term_ack ();          virtual void process_term_req (class owned_t *object_); diff --git a/src/options.cpp b/src/options.cpp index c0d5339..a713ede 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -45,19 +45,19 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,      switch (option_) {      case ZMQ_HWM: -        if (optvallen_ != sizeof (int64_t)) { +        if (optvallen_ != sizeof (uint64_t)) {              errno = EINVAL;              return -1;          } -        hwm = *((int64_t*) optval_); +        hwm = *((uint64_t*) optval_);          return 0;      case ZMQ_LWM: -        if (optvallen_ != sizeof (int64_t)) { +        if (optvallen_ != sizeof (uint64_t)) {              errno = EINVAL;              return -1;          } -        lwm = *((int64_t*) optval_); +        lwm = *((uint64_t*) optval_);          return 0;      case ZMQ_SWAP: diff --git a/src/options.hpp b/src/options.hpp index 6d9be4d..eba8ab8 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -33,8 +33,8 @@ namespace zmq          int setsockopt (int option_, const void *optval_, size_t optvallen_); -        int64_t hwm; -        int64_t lwm; +        uint64_t hwm; +        uint64_t lwm;          int64_t swap;          uint64_t affinity;          blob_t identity; diff --git a/src/p2p.cpp b/src/p2p.cpp index 334cfcc..728854b 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -73,6 +73,11 @@ void zmq::p2p_t::xrevive (class reader_t *pipe_)      alive = true;  } +void zmq::p2p_t::xrevive (class writer_t *pipe_) +{ +    zmq_not_implemented (); +} +  int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,      size_t optvallen_)  { @@ -87,10 +92,8 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)          return -1;      } -    //  TODO: Implement this once queue limits are in-place. -    zmq_assert (outpipe->check_write (zmq_msg_size (msg_))); - -    outpipe->write (msg_); +    bool written = outpipe->write (msg_); +    zmq_assert (written);      if (!(flags_ & ZMQ_NOFLUSH))          outpipe->flush (); diff --git a/src/p2p.hpp b/src/p2p.hpp index bca0eab..e12b58c 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -39,6 +39,7 @@ namespace zmq          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_);          void xrevive (class reader_t *pipe_); +        void xrevive (class writer_t *pipe_);          int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          int xsend (zmq_msg_t *msg_, int flags_);          int xflush (); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index e708229..5480030 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,6 +88,11 @@ void zmq::pgm_receiver_t::revive ()      zmq_assert (false);  } +void zmq::pgm_receiver_t::resume_input () +{ +   zmq_not_implemented (); +} +  void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)  {      //  No need for tracerouting functionality in PGM socket at the moment. diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 3f0ef81..dd9402f 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,6 +54,7 @@ namespace zmq          void plug (struct i_inout *inout_);          void unplug ();          void revive (); +        void resume_input ();          void add_prefix (const blob_t &identity_);          void trim_prefix (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 27b4d0c..01eac2b 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -102,6 +102,11 @@ void zmq::pgm_sender_t::revive ()      out_event ();  } +void zmq::pgm_sender_t::resume_input () +{ +    zmq_assert (false); +} +  void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)  {      //  No need for tracerouting functionality in PGM socket at the moment. diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 951c417..4b232b1 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,6 +52,7 @@ namespace zmq          void plug (struct i_inout *inout_);          void unplug ();          void revive (); +        void resume_input ();          void add_prefix (const blob_t &identity_);          void trim_prefix (); diff --git a/src/pipe.cpp b/src/pipe.cpp index da019c1..53dfb21 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -28,8 +28,12 @@ zmq::reader_t::reader_t (object_t *parent_,      peer (NULL),      hwm (hwm_),      lwm (lwm_), +    msgs_read (0),      endpoint (NULL)  { +    //  Adjust lwm and hwm. +    if (lwm == 0 || lwm > hwm) +        lwm = hwm;  }  zmq::reader_t::~reader_t () @@ -73,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)          return false;      } -    //  TODO: Adjust the size of the pipe. +    msgs_read++; +    if (lwm > 0 && msgs_read % lwm == 0) +        send_reader_info (peer, msgs_read);      return true;  } @@ -111,8 +117,14 @@ zmq::writer_t::writer_t (object_t *parent_,      peer (NULL),      hwm (hwm_),      lwm (lwm_), +    msgs_read (0), +    msgs_written (0), +    stalled (false),      endpoint (NULL)  { +    //  Adjust lwm and hwm. +    if (lwm == 0 || lwm > hwm) +        lwm = hwm;  }  void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) @@ -131,32 +143,41 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)      peer = &pipe->reader;  } -bool zmq::writer_t::check_write (uint64_t size_) +bool zmq::writer_t::check_write ()  { -    //  TODO: Check whether hwm is exceeded. +    if (pipe_full ()) { +        stalled = true; +        return false; +    }      return true;  }  bool zmq::writer_t::write (zmq_msg_t *msg_)  { +    if (pipe_full ()) { +        stalled = true; +        return false; +    } +      pipe->write (*msg_); +    msgs_written++;      return true; - -    //  TODO: Adjust size of the pipe.  }  void zmq::writer_t::rollback ()  { -    while (true) { -        zmq_msg_t msg; -        if (!pipe->unwrite (&msg)) -            break; +    zmq_msg_t msg; + +    while (pipe->unwrite (&msg)) {          zmq_msg_close (&msg); +        msgs_written--;      } -    //  TODO: We don't have to inform the reader side of the pipe about -    //  the event. We'll simply adjust the pipe size and keep calm. +    if (stalled && endpoint != NULL && !pipe_full()) { +        stalled = false; +        endpoint->revive (this); +    }  }  void zmq::writer_t::flush () @@ -179,6 +200,15 @@ void zmq::writer_t::term ()      pipe->flush ();  } +void zmq::writer_t::process_reader_info (uint64_t msgs_read_) +{ +    msgs_read = msgs_read_; +    if (stalled && endpoint != NULL) { +        stalled = false; +        endpoint->revive (this); +    } +} +  void zmq::writer_t::process_pipe_term ()  {      if (endpoint) @@ -189,6 +219,11 @@ void zmq::writer_t::process_pipe_term ()      send_pipe_term_ack (p);  } +bool zmq::writer_t::pipe_full () +{ +    return hwm > 0 && msgs_written - msgs_read == hwm; +} +  zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,        uint64_t hwm_, uint64_t lwm_) :      reader (reader_parent_, hwm_, lwm_), diff --git a/src/pipe.hpp b/src/pipe.hpp index df3d0b1..0ac7fc5 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -68,10 +68,8 @@ namespace zmq          uint64_t hwm;          uint64_t lwm; -        //  Positions of head and tail of the pipe (in bytes). -        uint64_t head; -        uint64_t tail; -        uint64_t last_sent_head; +        //  Number of messages read so far. +        uint64_t msgs_read;          //  Endpoint (either session or socket) the pipe is attached to.          i_endpoint *endpoint; @@ -91,10 +89,10 @@ namespace zmq          void set_pipe (class pipe_t *pipe_);          void set_endpoint (i_endpoint *endpoint_); -        //  Checks whether message with specified size can be written to the -        //  pipe. If writing the message would cause high watermark to be +        //  Checks whether a message can be written to the pipe. +        //  If writing the message would cause high watermark to be          //  exceeded, the function returns false. -        bool check_write (uint64_t size_); +        bool check_write ();          //  Writes a message to the underlying pipe. Returns false if the          //  message cannot be written because high watermark was reached. @@ -111,9 +109,14 @@ namespace zmq      private: +        void process_reader_info (uint64_t msgs_read_); +          //  Command handlers.          void process_pipe_term (); +        //  Tests whether the pipe is already full. +        bool pipe_full (); +          //  The underlying pipe.          class pipe_t *pipe; @@ -124,9 +127,15 @@ namespace zmq          uint64_t hwm;          uint64_t lwm; -        //  Positions of head and tail of the pipe (in bytes). -        uint64_t head; -        uint64_t tail; +        //  Last confirmed number of messages read from the pipe. +        //  The actual number can be higher. +        uint64_t msgs_read; + +        //  Number of messages we have written so far. +        uint64_t msgs_written; + +        //  True iff the last attempt to write a message has failed. +        bool stalled;          //  Endpoint (either session or socket) the pipe is attached to.          i_endpoint *endpoint; diff --git a/src/pub.cpp b/src/pub.cpp index 643e29e..b6802fd 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -65,6 +65,11 @@ void zmq::pub_t::xrevive (class reader_t *pipe_)      zmq_assert (false);  } +void zmq::pub_t::xrevive (class writer_t *pipe_) +{ +    zmq_not_implemented (); +} +  int zmq::pub_t::xsetsockopt (int option_, const void *optval_,      size_t optvallen_)  { @@ -87,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)      //  First check whether all pipes are available for writing.      for (out_pipes_t::size_type i = 0; i != pipes_count; i++) -        if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) { +        if (!out_pipes [i]->check_write ()) {              errno = EAGAIN;              return -1;          } @@ -97,7 +102,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)      //  For VSMs the copying is straighforward.      if (content == (msg_content_t*) ZMQ_VSM) {          for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { -            out_pipes [i]->write (msg_); +            bool written = out_pipes [i]->write (msg_); +            zmq_assert (written);              if (!(flags_ & ZMQ_NOFLUSH))                  out_pipes [i]->flush ();          } @@ -110,7 +116,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)      //  to send the message to - no refcount adjustment i.e. no atomic      //  operations are needed.      if (pipes_count == 1) { -        out_pipes [0]->write (msg_); +        bool written = out_pipes [0]->write (msg_); +        zmq_assert (written);          if (!(flags_ & ZMQ_NOFLUSH))              out_pipes [0]->flush ();          int rc = zmq_msg_init (msg_); @@ -130,7 +137,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)      //  Push the message to all destinations.      for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { -        out_pipes [i]->write (msg_); +        bool written = out_pipes [i]->write (msg_); +        zmq_assert (written);          if (!(flags_ & ZMQ_NOFLUSH))              out_pipes [i]->flush ();      } diff --git a/src/pub.hpp b/src/pub.hpp index 26142a4..a85301f 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -40,6 +40,7 @@ namespace zmq          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_);          void xrevive (class reader_t *pipe_); +        void xrevive (class writer_t *pipe_);          int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          int xsend (zmq_msg_t *msg_, int flags_);          int xflush (); diff --git a/src/rep.cpp b/src/rep.cpp index 4e69fa3..2cd4144 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -144,6 +144,11 @@ void zmq::rep_t::xrevive (class reader_t *pipe_)      active++;  } +void zmq::rep_t::xrevive (class writer_t *pipe_) +{ +    zmq_not_implemented (); +} +  int zmq::rep_t::xsetsockopt (int option_, const void *optval_,      size_t optvallen_)  { @@ -160,12 +165,13 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)      //  TODO: Implement this once queue limits are in-place. If the reply      //  overloads the buffer, connection should be torn down. -    zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_))); +    zmq_assert (reply_pipe->check_write ());      //  Push message to the selected pipe. If requester have disconnected      //  in the meantime, drop the reply.      if (reply_pipe) { -        reply_pipe->write (msg_); +        bool written = reply_pipe->write (msg_); +        zmq_assert (written);          reply_pipe->flush ();      }      else { diff --git a/src/rep.hpp b/src/rep.hpp index 7ead321..9d2357d 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -40,6 +40,7 @@ namespace zmq          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_);          void xrevive (class reader_t *pipe_); +        void xrevive (class writer_t *pipe_);          int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          int xsend (zmq_msg_t *msg_, int flags_);          int xflush (); diff --git a/src/req.cpp b/src/req.cpp index 5c067b3..f21613a 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -108,6 +108,11 @@ void zmq::req_t::xrevive (class reader_t *pipe_)      reply_pipe_active = true;  } +void zmq::req_t::xrevive (class writer_t *pipe_) +{ +    zmq_not_implemented (); +} +  int zmq::req_t::xsetsockopt (int option_, const void *optval_,      size_t optvallen_)  { @@ -142,11 +147,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)              current = 0;      } -    //  TODO: Implement this once queue limits are in-place. -    zmq_assert (out_pipes [current]->check_write (zmq_msg_size (msg_))); -      //  Push message to the selected pipe. -    out_pipes [current]->write (msg_); +    bool written = out_pipes [current]->write (msg_); +    zmq_assert (written);      out_pipes [current]->flush ();      waiting_for_reply = true; diff --git a/src/req.hpp b/src/req.hpp index da8e61a..4058b08 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -40,6 +40,7 @@ namespace zmq          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_);          void xrevive (class reader_t *pipe_); +        void xrevive (class writer_t *pipe_);          int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          int xsend (zmq_msg_t *msg_, int flags_);          int xflush (); | 
