summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-22 17:26:53 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-22 17:26:53 +0200
commitacf0b0e515515e51ad32ba7a2d147ce703579478 (patch)
treed2032009cf46c23aa0f677c2216914f718ab968a /src/socket_base.cpp
parent9e6b39925603f9e64db08c469bd628d7ef9465de (diff)
Introduces bi-directional pipes
So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp73
1 files changed, 29 insertions, 44 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 3e104a8..baa4bd2 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -211,17 +211,17 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return 0;
}
-void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_,
+ const blob_t &peer_identity_)
{
// If the peer haven't specified it's identity, let's generate one.
if (peer_identity_.size ()) {
- xattach_pipes (inpipe_, outpipe_, peer_identity_);
+ xattach_pipe (pipe_, peer_identity_);
}
else {
blob_t identity (17, 0);
generate_uuid ((unsigned char*) identity.data () + 1);
- xattach_pipes (inpipe_, outpipe_, identity);
+ xattach_pipe (pipe_, identity);
}
}
@@ -378,11 +378,6 @@ int zmq::socket_base_t::connect (const char *addr_)
if (!peer.socket)
return -1;
- reader_t *inpipe_reader = NULL;
- writer_t *inpipe_writer = NULL;
- reader_t *outpipe_reader = NULL;
- writer_t *outpipe_writer = NULL;
-
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
int sndhwm;
@@ -396,24 +391,21 @@ int zmq::socket_base_t::connect (const char *addr_)
else
rcvhwm = options.rcvhwm + peer.options.sndhwm;
- // Create inbound pipe, if required.
- if (options.requires_in)
- create_pipe (this, peer.socket, rcvhwm, &inpipe_reader,
- &inpipe_writer);
-
- // Create outbound pipe, if required.
- if (options.requires_out)
- create_pipe (peer.socket, this, sndhwm, &outpipe_reader,
- &outpipe_writer);
+ // Create a bi-directional pipe to connect the peers.
+ object_t *parents [2] = {this, peer.socket};
+ pipe_t *pipes [2] = {NULL, NULL};
+ int hwms [2] = {sndhwm, rcvhwm};
+ bool delays [2] = {true, true};
+ int rc = pipepair (parents, pipes, hwms, delays);
+ errno_assert (rc == 0);
- // Attach the pipes to this socket object.
- attach_pipes (inpipe_reader, outpipe_writer, peer.options.identity);
+ // Attach local end of the pipe to this socket object.
+ attach_pipe (pipes [0], peer.options.identity);
- // Attach the pipes to the peer socket. Note that peer's seqnum
- // was incremented in find_endpoint function. We don't need it
+ // Attach remote end of the pipe to the peer socket. Note that peer's
+ // seqnum was incremented in find_endpoint function. We don't need it
// increased here.
- send_bind (peer.socket, outpipe_reader, inpipe_writer,
- options.identity, false);
+ send_bind (peer.socket, pipes [1], options.identity, false);
return 0;
}
@@ -435,26 +427,19 @@ int zmq::socket_base_t::connect (const char *addr_)
// session once the connection is established.
if (options.immediate_connect) {
- reader_t *inpipe_reader = NULL;
- writer_t *inpipe_writer = NULL;
- reader_t *outpipe_reader = NULL;
- writer_t *outpipe_writer = NULL;
-
- // Create inbound pipe, if required.
- if (options.requires_in)
- create_pipe (this, session, options.rcvhwm,
- &inpipe_reader, &inpipe_writer);
-
- // Create outbound pipe, if required.
- if (options.requires_out)
- create_pipe (session, this, options.sndhwm,
- &outpipe_reader, &outpipe_writer);
+ // Create a bi-directional pipe.
+ object_t *parents [2] = {this, session};
+ pipe_t *pipes [2] = {NULL, NULL};
+ int hwms [2] = {options.sndhwm, options.rcvhwm};
+ bool delays [2] = {true, true};
+ int rc = pipepair (parents, pipes, hwms, delays);
+ errno_assert (rc == 0);
- // Attach the pipes to the socket object.
- attach_pipes (inpipe_reader, outpipe_writer, blob_t ());
+ // Attach local end of the pipe to the socket object.
+ attach_pipe (pipes [0], blob_t ());
- // Attach the pipes to the session object.
- session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ());
+ // Attach remote end of the pipe to the session object.
+ session->attach_pipe (pipes [1], blob_t ());
}
// Activate the session. Make it a child of this socket.
@@ -718,10 +703,10 @@ void zmq::socket_base_t::process_stop ()
ctx_terminated = true;
}
-void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+void zmq::socket_base_t::process_bind (pipe_t *pipe_,
const blob_t &peer_identity_)
{
- attach_pipes (in_pipe_, out_pipe_, peer_identity_);
+ attach_pipe (pipe_, peer_identity_);
}
void zmq::socket_base_t::process_unplug ()