From 43620b3d35e3f3a6e49046fdc0426651bf58dedb Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 14 Feb 2010 13:34:48 +0100 Subject: Multi-hop REQ/REP, part X., optional delayed creation of pipes during connect --- src/socket_base.cpp | 55 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 20 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 1d4eae6..222b769 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "inproc") { + // TODO: inproc connect is specific with respect to creating pipes + // as there's no 'reconnect' functionality implemented. Once that + // is in place we should follow generic pipe creation algorithm. + // Find the peer socket. socket_base_t *peer = find_endpoint (addr_args.c_str ()); if (!peer) @@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_) this, options); zmq_assert (session); - pipe_t *in_pipe = NULL; - pipe_t *out_pipe = NULL; + // If 'immediate connect' feature is required, we'll created the pipes + // to the session straight away. Otherwise, they'll be created by the + // session once the connection is established. + if (options.immediate_connect) { - // Create inbound pipe, if required. - if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, - options.hwm, options.lwm); - zmq_assert (in_pipe); + pipe_t *in_pipe = NULL; + pipe_t *out_pipe = NULL; - } + // Create inbound pipe, if required. + if (options.requires_in) { + in_pipe = new (std::nothrow) pipe_t (this, session, + options.hwm, options.lwm); + zmq_assert (in_pipe); - // Create outbound pipe, if required. - if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, - options.hwm, options.lwm); - zmq_assert (out_pipe); - } + } - // Attach the pipes to the socket object. - attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL); + // Create outbound pipe, if required. + if (options.requires_out) { + out_pipe = new (std::nothrow) pipe_t (session, this, + options.hwm, options.lwm); + zmq_assert (out_pipe); + } - // Attach the pipes to the session object. - session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL); + // Attach the pipes to the socket object. + attach_pipes (in_pipe ? &in_pipe->reader : NULL, + out_pipe ? &out_pipe->writer : NULL); + + // Attach the pipes to the session object. + session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, + in_pipe ? &in_pipe->writer : NULL); + } // Activate the session. send_plug (session); @@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "tcp" || addr_type == "ipc") { #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + // Windows named pipes are not compatible with Winsock API. + // There's no UNIX domain socket implementation on OpenVMS. if (addr_type == "ipc") { errno = EPROTONOSUPPORT; return -1; @@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "udp") udp_encapsulation = true; + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. if (options.requires_out) { // PGM sender. -- cgit v1.2.3