From 43620b3d35e3f3a6e49046fdc0426651bf58dedb Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 14 Feb 2010 13:34:48 +0100 Subject: Multi-hop REQ/REP, part X., optional delayed creation of pipes during connect --- src/options.cpp | 1 + src/options.hpp | 6 ++++++ src/rep.cpp | 5 +++++ src/session.cpp | 48 ++++++++++++++++++++++------------------------ src/socket_base.cpp | 55 ++++++++++++++++++++++++++++++++++------------------- src/xrep.cpp | 4 ++++ 6 files changed, 74 insertions(+), 45 deletions(-) diff --git a/src/options.cpp b/src/options.cpp index a70b9a3..b77af24 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -34,6 +34,7 @@ zmq::options_t::options_t () : rcvbuf (0), requires_in (false), requires_out (false), + immediate_connect (true), traceroute (false) { } diff --git a/src/options.hpp b/src/options.hpp index 541e6e8..6d9be4d 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -56,6 +56,12 @@ namespace zmq bool requires_in; bool requires_out; + // If true, when connecting, pipes are created immediately without + // waiting for the connection to be established. That way the socket + // is not aware of the peer's identity, however, it is able to send + // messages straight away. + bool immediate_connect; + // If true, socket requires tracerouting the messages. bool traceroute; }; diff --git a/src/rep.cpp b/src/rep.cpp index b6bffae..8c5b86c 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -32,6 +32,11 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) : { options.requires_in = true; options.requires_out = true; + + // We don't need immediate connect. We'll be able to send messages + // (replies) only when connection is established and thus requests + // can arrive anyway. + options.immediate_connect = false; } zmq::rep_t::~rep_t () diff --git a/src/session.cpp b/src/session.cpp index 909501a..f86327e 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -220,34 +220,32 @@ void zmq::session_t::process_attach (i_engine *engine_, } } - // If session is created by 'connect' function, it has the pipes set - // already. Otherwise, it's being created by the listener and the pipes - // are yet to be created. - if (!in_pipe && !out_pipe) { - - pipe_t *inbound = NULL; - pipe_t *outbound = NULL; - - if (options.requires_out) { - inbound = new (std::nothrow) pipe_t (this, owner, - options.hwm, options.lwm); - zmq_assert (inbound); - in_pipe = &inbound->reader; - in_pipe->set_endpoint (this); - } - - if (options.requires_in) { - outbound = new (std::nothrow) pipe_t (owner, this, - options.hwm, options.lwm); - zmq_assert (outbound); - out_pipe = &outbound->writer; - out_pipe->set_endpoint (this); - } + // Check whether the required pipes already exist. If not so, we'll + // create them and bind them to the socket object. + reader_t *socket_reader = NULL; + writer_t *socket_writer = NULL; + + if (options.requires_in && !out_pipe) { + pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, + options.hwm, options.lwm); + zmq_assert (pipe); + out_pipe = &pipe->writer; + out_pipe->set_endpoint (this); + socket_reader = &pipe->reader; + } - send_bind (owner, outbound ? &outbound->reader : NULL, - inbound ? &inbound->writer : NULL, peer_identity); + if (options.requires_out && !in_pipe) { + pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, + options.hwm, options.lwm); + zmq_assert (pipe); + in_pipe = &pipe->reader; + in_pipe->set_endpoint (this); + socket_writer = &pipe->writer; } + if (socket_reader || socket_writer) + send_bind (owner, socket_reader, socket_writer, peer_identity); + // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 1d4eae6..222b769 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "inproc") { + // TODO: inproc connect is specific with respect to creating pipes + // as there's no 'reconnect' functionality implemented. Once that + // is in place we should follow generic pipe creation algorithm. + // Find the peer socket. socket_base_t *peer = find_endpoint (addr_args.c_str ()); if (!peer) @@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_) this, options); zmq_assert (session); - pipe_t *in_pipe = NULL; - pipe_t *out_pipe = NULL; + // If 'immediate connect' feature is required, we'll created the pipes + // to the session straight away. Otherwise, they'll be created by the + // session once the connection is established. + if (options.immediate_connect) { - // Create inbound pipe, if required. - if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, - options.hwm, options.lwm); - zmq_assert (in_pipe); + pipe_t *in_pipe = NULL; + pipe_t *out_pipe = NULL; - } + // Create inbound pipe, if required. + if (options.requires_in) { + in_pipe = new (std::nothrow) pipe_t (this, session, + options.hwm, options.lwm); + zmq_assert (in_pipe); - // Create outbound pipe, if required. - if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, - options.hwm, options.lwm); - zmq_assert (out_pipe); - } + } - // Attach the pipes to the socket object. - attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL); + // Create outbound pipe, if required. + if (options.requires_out) { + out_pipe = new (std::nothrow) pipe_t (session, this, + options.hwm, options.lwm); + zmq_assert (out_pipe); + } - // Attach the pipes to the session object. - session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL); + // Attach the pipes to the socket object. + attach_pipes (in_pipe ? &in_pipe->reader : NULL, + out_pipe ? &out_pipe->writer : NULL); + + // Attach the pipes to the session object. + session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, + in_pipe ? &in_pipe->writer : NULL); + } // Activate the session. send_plug (session); @@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "tcp" || addr_type == "ipc") { #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + // Windows named pipes are not compatible with Winsock API. + // There's no UNIX domain socket implementation on OpenVMS. if (addr_type == "ipc") { errno = EPROTONOSUPPORT; return -1; @@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "udp") udp_encapsulation = true; + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. if (options.requires_out) { // PGM sender. diff --git a/src/xrep.cpp b/src/xrep.cpp index 9462a60..328a832 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -28,6 +28,10 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : options.requires_in = true; options.requires_out = true; + // On connect, pipes are created only after initial handshaking. + // That way we are aware of the peer's identity when binding to the pipes. + options.immediate_connect = false; + // XREP socket adds identity to inbound messages and strips identity // from the outbound messages. options.traceroute = true; -- cgit v1.2.3