From d13933bc62fce71b5a58118020e0dd3776e79aa9 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 11 Aug 2010 14:09:56 +0200 Subject: I/O object hierarchy implemented --- src/xrep.cpp | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index 73d7856..c1cfb00 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -29,7 +29,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t slot_) : prefetched (false), more_in (false), current_out (NULL), - more_out (false) + more_out (false), + terminating (false) { options.requires_in = true; options.requires_out = true; @@ -62,16 +63,27 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, inpipe_t inpipe = {inpipe_, peer_identity_, true}; inpipes.push_back (inpipe); + + if (terminating) { + register_term_acks (1); + inpipe_->terminate (); + } } -void zmq::xrep_t::xterm_pipes () +void zmq::xrep_t::process_term () { + terminating = true; + + register_term_acks (inpipes.size () + outpipes.size ()); + for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++) it->reader->terminate (); for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); it++) it->second.writer->terminate (); + + socket_base_t::process_term (); } void zmq::xrep_t::terminated (reader_t *pipe_) @@ -80,6 +92,8 @@ void zmq::xrep_t::terminated (reader_t *pipe_) it++) { if (it->reader == pipe_) { inpipes.erase (it); + if (terminating) + unregister_term_ack (); return; } } @@ -94,17 +108,14 @@ void zmq::xrep_t::terminated (writer_t *pipe_) outpipes.erase (it); if (pipe_ == current_out) current_out = NULL; + if (terminating) + unregister_term_ack (); return; } } zmq_assert (false); } -bool zmq::xrep_t::xhas_pipes () -{ - return !inpipes.empty () || !outpipes.empty (); -} - void zmq::xrep_t::activated (reader_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); -- cgit v1.2.3