summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-02-14 13:34:48 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-02-14 13:34:48 +0100
commit43620b3d35e3f3a6e49046fdc0426651bf58dedb (patch)
tree144d680f6dec4c0773133a42900eb6acaf95282e
parent4405250d93a2ad6eb3940c4bc4fe8ea32bd52f9e (diff)
Multi-hop REQ/REP, part X., optional delayed creation of pipes during connect
-rw-r--r--src/options.cpp1
-rw-r--r--src/options.hpp6
-rw-r--r--src/rep.cpp5
-rw-r--r--src/session.cpp48
-rw-r--r--src/socket_base.cpp55
-rw-r--r--src/xrep.cpp4
6 files changed, 74 insertions, 45 deletions
diff --git a/src/options.cpp b/src/options.cpp
index a70b9a3..b77af24 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -34,6 +34,7 @@ zmq::options_t::options_t () :
rcvbuf (0),
requires_in (false),
requires_out (false),
+ immediate_connect (true),
traceroute (false)
{
}
diff --git a/src/options.hpp b/src/options.hpp
index 541e6e8..6d9be4d 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -56,6 +56,12 @@ namespace zmq
bool requires_in;
bool requires_out;
+ // If true, when connecting, pipes are created immediately without
+ // waiting for the connection to be established. That way the socket
+ // is not aware of the peer's identity, however, it is able to send
+ // messages straight away.
+ bool immediate_connect;
+
// If true, socket requires tracerouting the messages.
bool traceroute;
};
diff --git a/src/rep.cpp b/src/rep.cpp
index b6bffae..8c5b86c 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -32,6 +32,11 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
{
options.requires_in = true;
options.requires_out = true;
+
+ // We don't need immediate connect. We'll be able to send messages
+ // (replies) only when connection is established and thus requests
+ // can arrive anyway.
+ options.immediate_connect = false;
}
zmq::rep_t::~rep_t ()
diff --git a/src/session.cpp b/src/session.cpp
index 909501a..f86327e 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -220,34 +220,32 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
}
- // If session is created by 'connect' function, it has the pipes set
- // already. Otherwise, it's being created by the listener and the pipes
- // are yet to be created.
- if (!in_pipe && !out_pipe) {
-
- pipe_t *inbound = NULL;
- pipe_t *outbound = NULL;
-
- if (options.requires_out) {
- inbound = new (std::nothrow) pipe_t (this, owner,
- options.hwm, options.lwm);
- zmq_assert (inbound);
- in_pipe = &inbound->reader;
- in_pipe->set_endpoint (this);
- }
-
- if (options.requires_in) {
- outbound = new (std::nothrow) pipe_t (owner, this,
- options.hwm, options.lwm);
- zmq_assert (outbound);
- out_pipe = &outbound->writer;
- out_pipe->set_endpoint (this);
- }
+ // Check whether the required pipes already exist. If not so, we'll
+ // create them and bind them to the socket object.
+ reader_t *socket_reader = NULL;
+ writer_t *socket_writer = NULL;
+
+ if (options.requires_in && !out_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ out_pipe = &pipe->writer;
+ out_pipe->set_endpoint (this);
+ socket_reader = &pipe->reader;
+ }
- send_bind (owner, outbound ? &outbound->reader : NULL,
- inbound ? &inbound->writer : NULL, peer_identity);
+ if (options.requires_out && !in_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ in_pipe = &pipe->reader;
+ in_pipe->set_endpoint (this);
+ socket_writer = &pipe->writer;
}
+ if (socket_reader || socket_writer)
+ send_bind (owner, socket_reader, socket_writer, peer_identity);
+
// Plug in the engine.
zmq_assert (!engine);
zmq_assert (engine_);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 1d4eae6..222b769 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "inproc") {
+ // TODO: inproc connect is specific with respect to creating pipes
+ // as there's no 'reconnect' functionality implemented. Once that
+ // is in place we should follow generic pipe creation algorithm.
+
// Find the peer socket.
socket_base_t *peer = find_endpoint (addr_args.c_str ());
if (!peer)
@@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_)
this, options);
zmq_assert (session);
- pipe_t *in_pipe = NULL;
- pipe_t *out_pipe = NULL;
+ // If 'immediate connect' feature is required, we'll created the pipes
+ // to the session straight away. Otherwise, they'll be created by the
+ // session once the connection is established.
+ if (options.immediate_connect) {
- // Create inbound pipe, if required.
- if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, session,
- options.hwm, options.lwm);
- zmq_assert (in_pipe);
+ pipe_t *in_pipe = NULL;
+ pipe_t *out_pipe = NULL;
- }
+ // Create inbound pipe, if required.
+ if (options.requires_in) {
+ in_pipe = new (std::nothrow) pipe_t (this, session,
+ options.hwm, options.lwm);
+ zmq_assert (in_pipe);
- // Create outbound pipe, if required.
- if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (session, this,
- options.hwm, options.lwm);
- zmq_assert (out_pipe);
- }
+ }
- // Attach the pipes to the socket object.
- attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL);
+ // Create outbound pipe, if required.
+ if (options.requires_out) {
+ out_pipe = new (std::nothrow) pipe_t (session, this,
+ options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ }
- // Attach the pipes to the session object.
- session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL);
+ // Attach the pipes to the socket object.
+ attach_pipes (in_pipe ? &in_pipe->reader : NULL,
+ out_pipe ? &out_pipe->writer : NULL);
+
+ // Attach the pipes to the session object.
+ session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
+ in_pipe ? &in_pipe->writer : NULL);
+ }
// Activate the session.
send_plug (session);
@@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "tcp" || addr_type == "ipc") {
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+ // Windows named pipes are not compatible with Winsock API.
+ // There's no UNIX domain socket implementation on OpenVMS.
if (addr_type == "ipc") {
errno = EPROTONOSUPPORT;
return -1;
@@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "udp")
udp_encapsulation = true;
+ // At this point we'll create message pipes to the session straight
+ // away. There's no point in delaying it as no concept of 'connect'
+ // exists with PGM anyway.
if (options.requires_out) {
// PGM sender.
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 9462a60..328a832 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -28,6 +28,10 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
options.requires_in = true;
options.requires_out = true;
+ // On connect, pipes are created only after initial handshaking.
+ // That way we are aware of the peer's identity when binding to the pipes.
+ options.immediate_connect = false;
+
// XREP socket adds identity to inbound messages and strips identity
// from the outbound messages.
options.traceroute = true;