From acf0b0e515515e51ad32ba7a2d147ce703579478 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 22 May 2011 17:26:53 +0200 Subject: Introduces bi-directional pipes So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik --- src/xrep.cpp | 90 +++++++++++++++++++++++------------------------------------- 1 file changed, 34 insertions(+), 56 deletions(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index 2650f4e..d82890d 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -32,8 +32,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : terminating (false) { options.type = ZMQ_XREP; - options.requires_in = true; - options.requires_out = true; // On connect, pipes are created only after initial handshaking. // That way we are aware of the peer's identity when binding to the pipes. @@ -46,36 +44,26 @@ zmq::xrep_t::~xrep_t () zmq_assert (outpipes.empty ()); } -void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, - const blob_t &peer_identity_) +void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - if (outpipe_) { - - outpipe_->set_event_sink (this); - - // TODO: What if new connection has same peer identity as the old one? - outpipe_t outpipe = {outpipe_, true}; - bool ok = outpipes.insert (outpipes_t::value_type ( - peer_identity_, outpipe)).second; - zmq_assert (ok); - - if (terminating) { - register_term_acks (1); - outpipe_->terminate (); - } - } - - if (inpipe_) { - - inpipe_->set_event_sink (this); - - inpipe_t inpipe = {inpipe_, peer_identity_, true}; - inpipes.push_back (inpipe); - - if (terminating) { - register_term_acks (1); - inpipe_->terminate (); - } + zmq_assert (pipe_); + pipe_->set_event_sink (this); + + // Add the pipe to the map out outbound pipes. + // TODO: What if new connection has same peer identity as the old one? + outpipe_t outpipe = {pipe_, true}; + bool ok = outpipes.insert (outpipes_t::value_type ( + peer_identity_, outpipe)).second; + zmq_assert (ok); + + // Add the pipe to the list of inbound pipes. + inpipe_t inpipe = {pipe_, peer_identity_, true}; + inpipes.push_back (inpipe); + + // In case we are already terminating, ask this pipe to terminate as well. + if (terminating) { + register_term_acks (1); + pipe_->terminate (); } } @@ -85,21 +73,17 @@ void zmq::xrep_t::process_term (int linger_) register_term_acks ((int) (inpipes.size () + outpipes.size ())); - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); - ++it) - it->reader->terminate (); - for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); - ++it) - it->second.writer->terminate (); + for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) + it->pipe->terminate (); socket_base_t::process_term (linger_); } -void zmq::xrep_t::terminated (reader_t *pipe_) +void zmq::xrep_t::terminated (pipe_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) { - if (it->reader == pipe_) { + if (it->pipe == pipe_) { if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in) current_in--; inpipes.erase (it); @@ -107,17 +91,15 @@ void zmq::xrep_t::terminated (reader_t *pipe_) current_in = 0; if (terminating) unregister_term_ack (); - return; + goto clean_outpipes; } } zmq_assert (false); -} -void zmq::xrep_t::terminated (writer_t *pipe_) -{ +clean_outpipes: for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { - if (it->second.writer == pipe_) { + if (it->second.pipe == pipe_) { outpipes.erase (it); if (pipe_ == current_out) current_out = NULL; @@ -129,15 +111,11 @@ void zmq::xrep_t::terminated (writer_t *pipe_) zmq_assert (false); } -void zmq::xrep_t::delimited (reader_t *pipe_) -{ -} - -void zmq::xrep_t::activated (reader_t *pipe_) +void zmq::xrep_t::read_activated (pipe_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) { - if (it->reader == pipe_) { + if (it->pipe == pipe_) { zmq_assert (!it->active); it->active = true; return; @@ -146,11 +124,11 @@ void zmq::xrep_t::activated (reader_t *pipe_) zmq_assert (false); } -void zmq::xrep_t::activated (writer_t *pipe_) +void zmq::xrep_t::write_activated (pipe_t *pipe_) { for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { - if (it->second.writer == pipe_) { + if (it->second.pipe == pipe_) { zmq_assert (!it->second.active); it->second.active = true; return; @@ -178,7 +156,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) outpipes_t::iterator it = outpipes.find (identity); if (it != outpipes.end ()) { - current_out = it->second.writer; + current_out = it->second.pipe; msg_t empty; int rc = empty.init (); errno_assert (rc == 0); @@ -245,7 +223,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // If we are in the middle of reading a message, just grab next part of it. if (more_in) { zmq_assert (inpipes [current_in].active); - bool fetched = inpipes [current_in].reader->read (msg_); + bool fetched = inpipes [current_in].pipe->read (msg_); zmq_assert (fetched); more_in = msg_->flags () & msg_t::more; if (!more_in) { @@ -261,7 +239,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // Try to fetch new message. if (inpipes [current_in].active) - prefetched = inpipes [current_in].reader->read (&prefetched_msg); + prefetched = inpipes [current_in].pipe->read (&prefetched_msg); // If we have a message, create a prefix and return it to the caller. if (prefetched) { @@ -311,7 +289,7 @@ bool zmq::xrep_t::xhas_in () // pipe holding messages, skipping only pipes with no messages available. for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { if (inpipes [current_in].active && - inpipes [current_in].reader->check_read ()) + inpipes [current_in].pipe->check_read ()) return true; // If me don't have a message, mark the pipe as passive and -- cgit v1.2.3