diff options
| -rw-r--r-- | src/command.hpp | 3 | ||||
| -rw-r--r-- | src/object.cpp | 9 | ||||
| -rw-r--r-- | src/object.hpp | 6 | ||||
| -rw-r--r-- | src/session.cpp | 4 | ||||
| -rw-r--r-- | src/socket_base.cpp | 16 | ||||
| -rw-r--r-- | src/socket_base.hpp | 3 | 
6 files changed, 29 insertions, 12 deletions
| diff --git a/src/command.hpp b/src/command.hpp index a31805b..3099852 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -69,9 +69,12 @@ namespace zmq              } attach;              //  Sent from session to socket to establish pipe(s) between them. +            //  If adjust_seqnum is true, caller have used inc_seqnum beforehand +            //  and thus the callee should take care of catching up.              struct {                  class reader_t *in_pipe;                  class writer_t *out_pipe; +                bool adjust_seqnum;              } bind;              //  Sent by pipe writer to inform dormant pipe reader that there diff --git a/src/object.cpp b/src/object.cpp index 6b05380..b5d5eee 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -83,7 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_)          return;      case command_t::bind: -        process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); +        process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, +            cmd_.args.bind.adjust_seqnum);          return;      case command_t::pipe_term: @@ -183,13 +184,14 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)  }  void zmq::object_t::send_bind (object_t *destination_, -    reader_t *in_pipe_, writer_t *out_pipe_) +    reader_t *in_pipe_, writer_t *out_pipe_, bool adjust_seqnum_)  {      command_t cmd;      cmd.destination = destination_;      cmd.type = command_t::bind;      cmd.args.bind.in_pipe = in_pipe_;      cmd.args.bind.out_pipe = out_pipe_; +    cmd.args.bind.adjust_seqnum = adjust_seqnum_;      send_command (cmd);  } @@ -263,7 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_)      zmq_assert (false);  } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, +    bool adjust_seqnum_)  {      zmq_assert (false);  } diff --git a/src/object.hpp b/src/object.hpp index 2f6c0c4..4fd0a8e 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -68,8 +68,8 @@ namespace zmq              class owned_t *object_);          void send_attach (class session_t *destination_,              struct i_engine *engine_); -        void send_bind (object_t *destination_, -            class reader_t *in_pipe_, class writer_t *out_pipe_); +        void send_bind (object_t *destination_, class reader_t *in_pipe_, +             class writer_t *out_pipe_, bool adjust_seqnum_);          void send_revive (class object_t *destination_);          void send_pipe_term (class writer_t *destination_);          void send_pipe_term_ack (class reader_t *destination_); @@ -85,7 +85,7 @@ namespace zmq          virtual void process_own (class owned_t *object_);          virtual void process_attach (struct i_engine *engine_);          virtual void process_bind (class reader_t *in_pipe_, -            class writer_t *out_pipe_); +            class writer_t *out_pipe_, bool adjust_seqnum_);          virtual void process_revive ();          virtual void process_pipe_term ();          virtual void process_pipe_term_ack (); diff --git a/src/session.cpp b/src/session.cpp index 388437b..f62de27 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -155,8 +155,10 @@ void zmq::session_t::process_plug ()              out_pipe->set_endpoint (this);          } +        //  Note that initial call to inc_seqnum was optimised out. Last +        //  parameter conveys the fact to the callee.          send_bind (owner, outbound ? &outbound->reader : NULL, -            inbound ? &inbound->writer : NULL); +            inbound ? &inbound->writer : NULL, false);      }      owned_t::process_plug (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e242e05..a614759 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -158,10 +158,10 @@ int zmq::socket_base_t::connect (const char *addr_)              out_pipe ? &out_pipe->writer : NULL);          //  Attach the pipes to the peer socket. Note that peer's seqnum -        //  was incremented in find_endpoint function. When this command -        //  is delivered, peer will consider the seqnum to be processed. +        //  was incremented in find_endpoint function. The callee is notified +        //  about the fact via the last parameter.          send_bind (peer, out_pipe ? &out_pipe->reader : NULL, -            in_pipe ? &in_pipe->writer : NULL); +            in_pipe ? &in_pipe->writer : NULL, true);          return 0;      } @@ -509,8 +509,16 @@ void zmq::socket_base_t::process_own (owned_t *object_)      io_objects.insert (object_);  } -void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, +     bool adjust_seqnum_)  { +    //  In case of inproc transport, the seqnum should catch up here. +    //  For other transports the seqnum modification can be optimised out +    //  because final handshaking between the socket and the session ensures +    //  that no 'bind' command will be left unprocessed. +    if (adjust_seqnum_) +        processed_seqnum++; +      attach_pipes (in_pipe_, out_pipe_);  } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index c766bda..dd7b526 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -114,7 +114,8 @@ namespace zmq          //  Handlers for incoming commands.          void process_own (class owned_t *object_); -        void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); +        void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, +            bool adjust_seqnum_);          void process_term_req (class owned_t *object_);          void process_term_ack (); | 
