diff options
| -rw-r--r-- | src/options.cpp | 1 | ||||
| -rw-r--r-- | src/options.hpp | 6 | ||||
| -rw-r--r-- | src/rep.cpp | 5 | ||||
| -rw-r--r-- | src/session.cpp | 48 | ||||
| -rw-r--r-- | src/socket_base.cpp | 55 | ||||
| -rw-r--r-- | src/xrep.cpp | 4 | 
6 files changed, 74 insertions, 45 deletions
| diff --git a/src/options.cpp b/src/options.cpp index a70b9a3..b77af24 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -34,6 +34,7 @@ zmq::options_t::options_t () :      rcvbuf (0),      requires_in (false),      requires_out (false), +    immediate_connect (true),      traceroute (false)  {  } diff --git a/src/options.hpp b/src/options.hpp index 541e6e8..6d9be4d 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -56,6 +56,12 @@ namespace zmq          bool requires_in;          bool requires_out; +        //  If true, when connecting, pipes are created immediately without +        //  waiting for the connection to be established. That way the socket +        //  is not aware of the peer's identity, however, it is able to send +        //  messages straight away. +        bool immediate_connect; +          //  If true, socket requires tracerouting the messages.          bool traceroute;      }; diff --git a/src/rep.cpp b/src/rep.cpp index b6bffae..8c5b86c 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -32,6 +32,11 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :  {      options.requires_in = true;      options.requires_out = true; + +    //  We don't need immediate connect. We'll be able to send messages +    //  (replies) only when connection is established and thus requests +    //  can arrive anyway. +    options.immediate_connect = false;  }  zmq::rep_t::~rep_t () diff --git a/src/session.cpp b/src/session.cpp index 909501a..f86327e 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -220,34 +220,32 @@ void zmq::session_t::process_attach (i_engine *engine_,          }      } -    //  If session is created by 'connect' function, it has the pipes set -    //  already. Otherwise, it's being created by the listener and the pipes -    //  are yet to be created. -    if (!in_pipe && !out_pipe) { - -        pipe_t *inbound = NULL; -        pipe_t *outbound = NULL; - -        if (options.requires_out) { -            inbound = new (std::nothrow) pipe_t (this, owner, -                options.hwm, options.lwm); -            zmq_assert (inbound); -            in_pipe = &inbound->reader; -            in_pipe->set_endpoint (this); -        } - -        if (options.requires_in) { -            outbound = new (std::nothrow) pipe_t (owner, this, -                options.hwm, options.lwm); -            zmq_assert (outbound); -            out_pipe = &outbound->writer; -            out_pipe->set_endpoint (this); -        } +    //  Check whether the required pipes already exist. If not so, we'll +    //  create them and bind them to the socket object. +    reader_t *socket_reader = NULL; +    writer_t *socket_writer = NULL; + +    if (options.requires_in && !out_pipe) { +        pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, +            options.hwm, options.lwm); +        zmq_assert (pipe); +        out_pipe = &pipe->writer; +        out_pipe->set_endpoint (this); +        socket_reader = &pipe->reader; +    } -        send_bind (owner, outbound ? &outbound->reader : NULL, -            inbound ? &inbound->writer : NULL, peer_identity); +    if (options.requires_out && !in_pipe) { +        pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, +            options.hwm, options.lwm); +        zmq_assert (pipe); +        in_pipe = &pipe->reader; +        in_pipe->set_endpoint (this); +        socket_writer = &pipe->writer;      } +    if (socket_reader || socket_writer) +        send_bind (owner, socket_reader, socket_writer, peer_identity); +      //  Plug in the engine.      zmq_assert (!engine);      zmq_assert (engine_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 1d4eae6..222b769 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_)      if (addr_type == "inproc") { +        //  TODO: inproc connect is specific with respect to creating pipes +        //  as there's no 'reconnect' functionality implemented. Once that +        //  is in place we should follow generic pipe creation algorithm. +          //  Find the peer socket.          socket_base_t *peer = find_endpoint (addr_args.c_str ());          if (!peer) @@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_)          this, options);      zmq_assert (session); -    pipe_t *in_pipe = NULL; -    pipe_t *out_pipe = NULL; +    //  If 'immediate connect' feature is required, we'll created the pipes +    //  to the session straight away. Otherwise, they'll be created by the +    //  session once the connection is established. +    if (options.immediate_connect) { -    //  Create inbound pipe, if required. -    if (options.requires_in) { -        in_pipe = new (std::nothrow) pipe_t (this, session, -            options.hwm, options.lwm); -        zmq_assert (in_pipe); +        pipe_t *in_pipe = NULL; +        pipe_t *out_pipe = NULL; -    } +        //  Create inbound pipe, if required. +        if (options.requires_in) { +            in_pipe = new (std::nothrow) pipe_t (this, session, +                options.hwm, options.lwm); +            zmq_assert (in_pipe); -    //  Create outbound pipe, if required. -    if (options.requires_out) { -        out_pipe = new (std::nothrow) pipe_t (session, this, -            options.hwm, options.lwm); -        zmq_assert (out_pipe); -    } +        } -    //  Attach the pipes to the socket object. -    attach_pipes (in_pipe ? &in_pipe->reader : NULL, -        out_pipe ? &out_pipe->writer : NULL); +        //  Create outbound pipe, if required. +        if (options.requires_out) { +            out_pipe = new (std::nothrow) pipe_t (session, this, +                options.hwm, options.lwm); +            zmq_assert (out_pipe); +        } -    //  Attach the pipes to the session object. -    session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, -        in_pipe ? &in_pipe->writer : NULL); +        //  Attach the pipes to the socket object. +        attach_pipes (in_pipe ? &in_pipe->reader : NULL, +            out_pipe ? &out_pipe->writer : NULL); + +        //  Attach the pipes to the session object. +        session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, +            in_pipe ? &in_pipe->writer : NULL); +    }      //  Activate the session.      send_plug (session); @@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_)      if (addr_type == "tcp" || addr_type == "ipc") {  #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS +        //  Windows named pipes are not compatible with Winsock API. +        //  There's no UNIX domain socket implementation on OpenVMS.          if (addr_type == "ipc") {              errno = EPROTONOSUPPORT;              return -1; @@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_)          if (addr_type == "udp")              udp_encapsulation = true; +        //  At this point we'll create message pipes to the session straight +        //  away. There's no point in delaying it as no concept of 'connect' +        //  exists with PGM anyway.          if (options.requires_out) {              //  PGM sender. diff --git a/src/xrep.cpp b/src/xrep.cpp index 9462a60..328a832 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -28,6 +28,10 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :      options.requires_in = true;      options.requires_out = true; +    //  On connect, pipes are created only after initial handshaking. +    //  That way we are aware of the peer's identity when binding to the pipes. +    options.immediate_connect = false; +      //  XREP socket adds identity to inbound messages and strips identity      //  from the outbound messages.      options.traceroute = true; | 
