diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-05-22 17:26:53 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-05-22 17:26:53 +0200 |
commit | acf0b0e515515e51ad32ba7a2d147ce703579478 (patch) | |
tree | d2032009cf46c23aa0f677c2216914f718ab968a /src/socket_base.cpp | |
parent | 9e6b39925603f9e64db08c469bd628d7ef9465de (diff) |
Introduces bi-directional pipes
So far, there was a pair of unidirectional pipes between a socket
and a session (or an inproc peer). This resulted in complex
problems with half-closed states and tracking which inpipe
corresponds to which outpipe.
This patch doesn't add any functionality in itself, but is
essential for further work on features like subscription
forwarding.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 73 |
1 files changed, 29 insertions, 44 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 3e104a8..baa4bd2 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -211,17 +211,17 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_, + const blob_t &peer_identity_) { // If the peer haven't specified it's identity, let's generate one. if (peer_identity_.size ()) { - xattach_pipes (inpipe_, outpipe_, peer_identity_); + xattach_pipe (pipe_, peer_identity_); } else { blob_t identity (17, 0); generate_uuid ((unsigned char*) identity.data () + 1); - xattach_pipes (inpipe_, outpipe_, identity); + xattach_pipe (pipe_, identity); } } @@ -378,11 +378,6 @@ int zmq::socket_base_t::connect (const char *addr_) if (!peer.socket) return -1; - reader_t *inpipe_reader = NULL; - writer_t *inpipe_writer = NULL; - reader_t *outpipe_reader = NULL; - writer_t *outpipe_writer = NULL; - // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. int sndhwm; @@ -396,24 +391,21 @@ int zmq::socket_base_t::connect (const char *addr_) else rcvhwm = options.rcvhwm + peer.options.sndhwm; - // Create inbound pipe, if required. - if (options.requires_in) - create_pipe (this, peer.socket, rcvhwm, &inpipe_reader, - &inpipe_writer); - - // Create outbound pipe, if required. - if (options.requires_out) - create_pipe (peer.socket, this, sndhwm, &outpipe_reader, - &outpipe_writer); + // Create a bi-directional pipe to connect the peers. + object_t *parents [2] = {this, peer.socket}; + pipe_t *pipes [2] = {NULL, NULL}; + int hwms [2] = {sndhwm, rcvhwm}; + bool delays [2] = {true, true}; + int rc = pipepair (parents, pipes, hwms, delays); + errno_assert (rc == 0); - // Attach the pipes to this socket object. - attach_pipes (inpipe_reader, outpipe_writer, peer.options.identity); + // Attach local end of the pipe to this socket object. + attach_pipe (pipes [0], peer.options.identity); - // Attach the pipes to the peer socket. Note that peer's seqnum - // was incremented in find_endpoint function. We don't need it + // Attach remote end of the pipe to the peer socket. Note that peer's + // seqnum was incremented in find_endpoint function. We don't need it // increased here. - send_bind (peer.socket, outpipe_reader, inpipe_writer, - options.identity, false); + send_bind (peer.socket, pipes [1], options.identity, false); return 0; } @@ -435,26 +427,19 @@ int zmq::socket_base_t::connect (const char *addr_) // session once the connection is established. if (options.immediate_connect) { - reader_t *inpipe_reader = NULL; - writer_t *inpipe_writer = NULL; - reader_t *outpipe_reader = NULL; - writer_t *outpipe_writer = NULL; - - // Create inbound pipe, if required. - if (options.requires_in) - create_pipe (this, session, options.rcvhwm, - &inpipe_reader, &inpipe_writer); - - // Create outbound pipe, if required. - if (options.requires_out) - create_pipe (session, this, options.sndhwm, - &outpipe_reader, &outpipe_writer); + // Create a bi-directional pipe. + object_t *parents [2] = {this, session}; + pipe_t *pipes [2] = {NULL, NULL}; + int hwms [2] = {options.sndhwm, options.rcvhwm}; + bool delays [2] = {true, true}; + int rc = pipepair (parents, pipes, hwms, delays); + errno_assert (rc == 0); - // Attach the pipes to the socket object. - attach_pipes (inpipe_reader, outpipe_writer, blob_t ()); + // Attach local end of the pipe to the socket object. + attach_pipe (pipes [0], blob_t ()); - // Attach the pipes to the session object. - session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ()); + // Attach remote end of the pipe to the session object. + session->attach_pipe (pipes [1], blob_t ()); } // Activate the session. Make it a child of this socket. @@ -718,10 +703,10 @@ void zmq::socket_base_t::process_stop () ctx_terminated = true; } -void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, +void zmq::socket_base_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_) { - attach_pipes (in_pipe_, out_pipe_, peer_identity_); + attach_pipe (pipe_, peer_identity_); } void zmq::socket_base_t::process_unplug () |