diff options
| -rw-r--r-- | src/command.hpp | 13 | ||||
| -rw-r--r-- | src/fq.cpp | 4 | ||||
| -rw-r--r-- | src/object.cpp | 26 | ||||
| -rw-r--r-- | src/object.hpp | 8 | ||||
| -rw-r--r-- | src/pipe.cpp | 8 | ||||
| -rw-r--r-- | src/pipe.hpp | 4 | ||||
| -rw-r--r-- | src/socket_base.cpp | 5 | 
7 files changed, 34 insertions, 34 deletions
| diff --git a/src/command.hpp b/src/command.hpp index a924b4e..a72d3ca 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -39,8 +39,8 @@ namespace zmq              own,              attach,              bind, -            revive, -            reader_info, +            activate_reader, +            activate_writer,              pipe_term,              pipe_term_ack,              term_req, @@ -83,14 +83,13 @@ namespace zmq              //  Sent by pipe writer to inform dormant pipe reader that there              //  are messages in the pipe.              struct { -            } revive; +            } activate_reader; -            //  Sent by pipe reader to inform pipe writer -            //  about how many messages it has read so far. -            //  Used to implement the flow control. +            //  Sent by pipe reader to inform pipe writer about how many +            //  messages it has read so far.              struct {                  uint64_t msgs_read; -            } reader_info; +            } activate_writer;              //  Sent by pipe reader to pipe writer to ask it to terminate              //  its end of the pipe. @@ -103,8 +103,8 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)          //  without blocking.          zmq_assert (!(more && !fetched)); -        //  Note that when message is not fetched, current pipe is killed and -        //  replaced by another active pipe. Thus we don't have to increase +        //  Note that when message is not fetched, current pipe is deactivated +        //  and replaced by another active pipe. Thus we don't have to increase          //  the 'current' pointer.          if (fetched) {              more = msg_->flags & ZMQ_MSG_MORE; diff --git a/src/object.cpp b/src/object.cpp index 3466431..5f4a94e 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -57,8 +57,12 @@ void zmq::object_t::process_command (command_t &cmd_)  {      switch (cmd_.type) { -    case command_t::revive: -        process_revive (); +    case command_t::activate_reader: +        process_activate_reader (); +        break; + +    case command_t::activate_writer: +        process_activate_writer (cmd_.args.activate_writer.msgs_read);          break;      case command_t::stop: @@ -89,10 +93,6 @@ void zmq::object_t::process_command (command_t &cmd_)          process_seqnum ();          break; -    case command_t::reader_info: -        process_reader_info (cmd_.args.reader_info.msgs_read); -        break; -      case command_t::pipe_term:          process_pipe_term ();          return; @@ -248,18 +248,18 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,      send_command (cmd);  } -void zmq::object_t::send_revive (object_t *destination_) +void zmq::object_t::send_activate_reader (reader_t *destination_)  {      command_t cmd;  #if defined ZMQ_MAKE_VALGRIND_HAPPY      memset (&cmd, 0, sizeof (cmd));  #endif      cmd.destination = destination_; -    cmd.type = command_t::revive; +    cmd.type = command_t::activate_reader;      send_command (cmd);  } -void zmq::object_t::send_reader_info (writer_t *destination_, +void zmq::object_t::send_activate_writer (writer_t *destination_,      uint64_t msgs_read_)  {      command_t cmd; @@ -267,8 +267,8 @@ void zmq::object_t::send_reader_info (writer_t *destination_,      memset (&cmd, 0, sizeof (cmd));  #endif      cmd.destination = destination_; -    cmd.type = command_t::reader_info; -    cmd.args.reader_info.msgs_read = msgs_read_; +    cmd.type = command_t::activate_writer; +    cmd.args.activate_writer.msgs_read = msgs_read_;      send_command (cmd);  } @@ -356,12 +356,12 @@ void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,      zmq_assert (false);  } -void zmq::object_t::process_revive () +void zmq::object_t::process_activate_reader ()  {      zmq_assert (false);  } -void zmq::object_t::process_reader_info (uint64_t msgs_read_) +void zmq::object_t::process_activate_writer (uint64_t msgs_read_)  {      zmq_assert (false);  } diff --git a/src/object.hpp b/src/object.hpp index e083ce3..8652a86 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -68,8 +68,8 @@ namespace zmq          void send_bind (class own_t *destination_,               class reader_t *in_pipe_, class writer_t *out_pipe_,               const blob_t &peer_identity_, bool inc_seqnum_ = true); -        void send_revive (class object_t *destination_); -        void send_reader_info (class writer_t *destination_, +        void send_activate_reader (class reader_t *destination_); +        void send_activate_writer (class writer_t *destination_,               uint64_t msgs_read_);          void send_pipe_term (class writer_t *destination_);          void send_pipe_term_ack (class reader_t *destination_); @@ -87,8 +87,8 @@ namespace zmq              const blob_t &peer_identity_);          virtual void process_bind (class reader_t *in_pipe_,              class writer_t *out_pipe_, const blob_t &peer_identity_); -        virtual void process_revive (); -        virtual void process_reader_info (uint64_t msgs_read_); +        virtual void process_activate_reader (); +        virtual void process_activate_writer (uint64_t msgs_read_);          virtual void process_pipe_term ();          virtual void process_pipe_term_ack ();          virtual void process_term_req (class own_t *object_); diff --git a/src/pipe.cpp b/src/pipe.cpp index 8785330..7fa7133 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -112,7 +112,7 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)          msgs_read++;      if (lwm > 0 && msgs_read % lwm == 0) -        send_reader_info (writer, msgs_read); +        send_activate_writer (writer, msgs_read);      return true;  } @@ -127,7 +127,7 @@ void zmq::reader_t::terminate ()      send_pipe_term (writer);  } -void zmq::reader_t::process_revive () +void zmq::reader_t::process_activate_reader ()  {      //  Forward the event to the sink (either socket or session).      sink->activated (this); @@ -258,7 +258,7 @@ void zmq::writer_t::rollback ()  void zmq::writer_t::flush ()  {      if (!pipe->flush ()) -        send_revive (reader); +        send_activate_reader (reader);  }  void zmq::writer_t::terminate () @@ -288,7 +288,7 @@ void zmq::writer_t::write_delimiter ()      flush ();  } -void zmq::writer_t::process_reader_info (uint64_t msgs_read_) +void zmq::writer_t::process_activate_writer (uint64_t msgs_read_)  {      zmq_msg_t msg; diff --git a/src/pipe.hpp b/src/pipe.hpp index 421ebc9..dcdd927 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -81,7 +81,7 @@ namespace zmq          void set_writer (class writer_t *writer_);          //  Command handlers. -        void process_revive (); +        void process_activate_reader ();          void process_pipe_term_ack ();          //  Returns true if the message is delimiter; false otherwise. @@ -150,7 +150,7 @@ namespace zmq              uint64_t hwm_, int64_t swap_size_);          ~writer_t (); -        void process_reader_info (uint64_t msgs_read_); +        void process_activate_writer (uint64_t msgs_read_);          //  Command handlers.          void process_pipe_term (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fe06d2f..0dae1b2 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -495,8 +495,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)      errno = err;      //  If the message cannot be fetched immediately, there are two scenarios. -    //  For non-blocking recv, commands are processed in case there's a revive -    //  command already waiting int a command pipe. If it's not, return EAGAIN. +    //  For non-blocking recv, commands are processed in case there's an +    //  activate_reader command already waiting int a command pipe. +    //  If it's not, return EAGAIN.      if (flags_ & ZMQ_NOBLOCK) {          if (errno != EAGAIN)              return -1; | 
